Class MultiThreadedImpExImportReader
- java.lang.Object
-
- de.hybris.platform.impex.jalo.ImpExReader
-
- de.hybris.platform.impex.jalo.imp.ImpExImportReader
-
- de.hybris.platform.impex.jalo.imp.MultiThreadedImpExImportReader
-
- Direct Known Subclasses:
ImpExImportJob.MyMultiThreadedImpExImportReader
public class MultiThreadedImpExImportReader extends ImpExImportReader
Multi-threaded ImpEx import reader. This implementation starts multiple threads for processing lines. For reading lines a separate thread is started too. Please note that line order cannot be guaranteed now. Reader methods which rely upon line order must not be used:discardNextLine()
,dumpNextLine(String)
andsetDumpingAllowed(boolean)
.- Since:
- 3.1-u7
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class de.hybris.platform.impex.jalo.ImpExReader
ImpExReader.ResultSetCSVReader
-
-
Field Summary
-
Fields inherited from class de.hybris.platform.impex.jalo.ImpExReader
FIRST
-
-
Constructor Summary
Constructors Constructor Description MultiThreadedImpExImportReader(CSVReader reader)
MultiThreadedImpExImportReader(CSVReader reader, boolean legacyMode)
MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter)
MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, DocumentIDRegistry docIDRegistry, MultiThreadedImportProcessor processor, EnumerationValue mode)
MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, MultiThreadedImportProcessor processor)
MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, MultiThreadedImportProcessor processor, boolean legacyMode)
MultiThreadedImpExImportReader(java.lang.String lines)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
addNewWorker(PoolableThread poolableThread, int workerIndex)
protected void
addResult(ImpExWorkerResult result)
protected HeaderDescriptor
createNewHeader(java.util.Map<java.lang.Integer,java.lang.String> line)
Overwritten to block execution until all workers finished previous lines.protected ImpExWorker
createWorker(PoolableThread poolableThread, int number)
void
discardNextLine()
Disallowed in parallel mode since line order cannot be guaranteed.void
dumpNextLine(java.lang.String reason)
Disallowed in parallel mode since line order cannot be guaranteed.protected boolean
ensureValidHeaderOrMarkUnresolved(ValueLine valueLine)
protected void
execute(AbstractCodeLine line, java.util.Map csvLine, boolean forEachMode)
Called for each code line.protected ValueLine
fetchNextValueLine(ImpExWorker worker)
protected ImpExWorkerResult
fetchNextWorkerResult()
protected int
getAllocatedThreads()
protected de.hybris.platform.impex.jalo.imp.MultiThreadedImpExImportReader.InitialThreads
getInitialThreads()
ImpExLogFilter
getLogFilter()
int
getMaxThreads()
protected void
handOffToResultProcessorWorker(ImpExWorkerResult result)
protected boolean
hasUnrecoverableError(ImpExWorkerResult result)
protected boolean
isAllWorkerFinished()
protected boolean
isInParallelMode()
protected boolean
isReaderFinished()
protected boolean
isResultProcessorFinished()
protected boolean
mustMarkLineAsUnresolved(ImpExWorkerResult result, ValueLine line)
protected void
postProcessValueLine(ValueLine currentValueLine, Item ret, java.lang.Exception error)
Now does nothing since at this time the value line is just put into queue and no result is available yet.protected void
postProcessValueLineInternal(ValueLine currentValueLine, AbstractProcessResult ret, java.lang.Exception error)
Real post processing of lines after the worker finished importing.protected Item
processLine(ValueLine valueLine)
Now enqueues the value line to be processed by one of our workers.protected boolean
processPendingResult(ImpExWorkerResult result)
protected Item
processValueLineFromWorker(ValueLine line)
Called fromImpExWorker
to trigger item data processing e.g.java.lang.Object
readLine()
Fetches next available result item.protected boolean
readLineFromWorker()
Called fromImpExReaderWorker
asynchronously.void
setCurrentHeader(HeaderDescriptor header)
Overwritten to switch on/off parallel mode allowed flag fromImpExConstants.Syntax.Modifier#PARALLEL
.void
setDumpingAllowed(boolean dumpingAllowed)
Dumping cannot be switched off in multi-threaded mode since line order cannot be guaranteed.void
setLogFilter(ImpExLogFilter logFilter)
void
setMaxThreads(int requested)
protected int
tryAllocateWorkers(int amount)
protected PoolableThread
tryToBorrowThread()
protected PoolableThread
tryToBorrowThread(ThreadPool threadPool)
protected void
writeUserRightsLines()
Overwritten to wait until line queue is empty.-
Methods inherited from class de.hybris.platform.impex.jalo.imp.ImpExImportReader
close, createValueLine, dumpNextLine, dumpUnresolvedLine, findMarker, getAfterEachCode, getCSVWriter, getCurrentValueLine, getCustomImportProcessor, getDumpedLineCount, getDumpedLineCountPerHeader, getForEachCode, getImportProcessor, getLastImportedItem, getLastImportedItemLineNumber, getLastImportedLine, getProcessedItemsCount, getProcessedItemsCountPerHeader, getResolvedItemsCount, getScriptExecutionContext, getValidationMode, getValueLineCount, incDumpCount, incItemsCount, isCreateHMCSavedValues, isDumpingAllowed, isLegacyMode, isSecondPass, preProcessLine, processMarkerCodeLine, readAll, setAfterEachCode, setBeanShellContext, setCreateHMCSavedValues, setCSVWriter, setForEachCode, setIsSecondPass, setValueLinesToSkip
-
Methods inherited from class de.hybris.platform.impex.jalo.ImpExReader
addDefinition, addExternalDataMedia, addExternalDataMedias, addHeaderExceptionInfoAsComment, addHeaderReplacementRule, addToBeanShellContext, adjustLineIndexes, applyHeaderReplacements, assureBeanShellLoaded, checkDefinitonKey, createCodeLine, createCodeLineLegacyWay, createCodeLineModernWay, createInvalidHeader, debug, debug, enableCodeExecution, enableExternalCodeExecution, enableExternalDataCodeExecution, enableExternalImpExSyntaxParsing, enableExternalSyntaxParsing, enterIfBlock, error, error, execute, exitIfBlock, findExternalDataMedia, findExternalDataMedia, getAllExternalDataMediaCodes, getAttributeConstraintFilter, getBeforeEachCode, getCellDecorators, getCSVReader, getCurrentHeader, getCurrentLocation, getCurrentReader, getDefaultReplacements, getDocumentIDRegistry, getFromBeanShellContext, getInvalidHeaderPolicy, getLineSize, getLocale, getStrictMode, gotInsertedLines, hasCellDecorators, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeSQLData, includeSQLData, includeSQLData, info, info, initDatabase, insertLine, isCodeExecutionEnabled, isCodeLine, isCombinedSearchEnabled, isCommentLine, isDebugEnabled, isDefinition, isEmptyLine, isEndUserRights, isExternalCodeExecutionEnabled, isExternalSyntaxParsingEnabled, isHeaderLine, isIncludingExternalData, isInfoEnabled, isNotInInactiveIfBlock, isStartUserRights, legacyExecute, lineToList, modernExecute, parseHeader, parseHeader, parseHeader, parseHeader, processCodeLine, readNextCSVLine, removeExternalDataMedia, removeExternalDataMedias, replaceDefinitions, setAttributeConstraintFilter, setBeforeEachCode, setCellDecorators, setCurrentHeader, setInvalidHeaderPolicy, setLocale, setRelaxedMode, setRelaxedMode, setValidationMode, setValidationMode, sortRulesList, splitDefinitonCell, storeUserRightsLine, warn, warn
-
-
-
-
Constructor Detail
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(java.lang.String lines)
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader)
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader, boolean legacyMode)
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter)
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, MultiThreadedImportProcessor processor)
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, MultiThreadedImportProcessor processor, boolean legacyMode)
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, DocumentIDRegistry docIDRegistry, MultiThreadedImportProcessor processor, EnumerationValue mode)
-
-
Method Detail
-
setDumpingAllowed
public void setDumpingAllowed(boolean dumpingAllowed)
Dumping cannot be switched off in multi-threaded mode since line order cannot be guaranteed.- Overrides:
setDumpingAllowed
in classImpExImportReader
-
discardNextLine
public void discardNextLine()
Disallowed in parallel mode since line order cannot be guaranteed.- Overrides:
discardNextLine
in classImpExImportReader
-
dumpNextLine
public void dumpNextLine(java.lang.String reason)
Disallowed in parallel mode since line order cannot be guaranteed.- Overrides:
dumpNextLine
in classImpExImportReader
- Parameters:
reason
- message stored with value line to dump describing reason for dumping
-
readLine
public java.lang.Object readLine() throws ImpExException
Fetches next available result item. Please note that due to multiple threads working in background there might be more items already imported at this time. Using this class it's not possible to perform single-step importing.- Overrides:
readLine
in classImpExImportReader
- Returns:
- the next item which was processed
- Throws:
ImpExException
- any import/export error exceptUnresolvedValueException
, which is handled internally
-
getInitialThreads
protected de.hybris.platform.impex.jalo.imp.MultiThreadedImpExImportReader.InitialThreads getInitialThreads()
-
postProcessValueLine
protected final void postProcessValueLine(ValueLine currentValueLine, Item ret, java.lang.Exception error) throws ImpExException
Now does nothing since at this time the value line is just put into queue and no result is available yet.- Overrides:
postProcessValueLine
in classImpExImportReader
- Throws:
ImpExException
-
processLine
protected final Item processLine(ValueLine valueLine) throws ImpExException
Now enqueues the value line to be processed by one of our workers. Returns adummy
item instance - don't use it!- Overrides:
processLine
in classImpExImportReader
- Throws:
ImpExException
-
writeUserRightsLines
protected void writeUserRightsLines() throws ImpExException
Overwritten to wait until line queue is empty.- Overrides:
writeUserRightsLines
in classImpExReader
- Throws:
ImpExException
- there were no user rights for import.
-
createNewHeader
protected HeaderDescriptor createNewHeader(java.util.Map<java.lang.Integer,java.lang.String> line) throws HeaderValidationException
Overwritten to block execution until all workers finished previous lines. This is necessary since header lines may contain default value expressions pointing to previous lines!- Overrides:
createNewHeader
in classImpExReader
- Parameters:
line
- line for which a header object is needed- Returns:
- created header object
- Throws:
HeaderValidationException
- header is not valid
-
setCurrentHeader
public void setCurrentHeader(HeaderDescriptor header)
Overwritten to switch on/off parallel mode allowed flag fromImpExConstants.Syntax.Modifier#PARALLEL
.- Overrides:
setCurrentHeader
in classImpExImportReader
- Parameters:
header
- the header which will be used from now as header for read value lines
-
readLineFromWorker
protected boolean readLineFromWorker() throws ImpExException
Called fromImpExReaderWorker
asynchronously.- Returns:
- true if file is not at end
- Throws:
ImpExException
-
processPendingResult
protected boolean processPendingResult(ImpExWorkerResult result)
-
mustMarkLineAsUnresolved
protected boolean mustMarkLineAsUnresolved(ImpExWorkerResult result, ValueLine line)
-
postProcessValueLineInternal
protected void postProcessValueLineInternal(ValueLine currentValueLine, AbstractProcessResult ret, java.lang.Exception error) throws ImpExException
Real post processing of lines after the worker finished importing.- Parameters:
currentValueLine
- the value lineret
- the import resulterror
-- Throws:
ImpExException
-
ensureValidHeaderOrMarkUnresolved
protected boolean ensureValidHeaderOrMarkUnresolved(ValueLine valueLine)
- Overrides:
ensureValidHeaderOrMarkUnresolved
in classImpExImportReader
-
fetchNextValueLine
protected ValueLine fetchNextValueLine(ImpExWorker worker)
-
fetchNextWorkerResult
protected ImpExWorkerResult fetchNextWorkerResult()
-
addResult
protected void addResult(ImpExWorkerResult result)
-
hasUnrecoverableError
protected boolean hasUnrecoverableError(ImpExWorkerResult result)
-
handOffToResultProcessorWorker
protected void handOffToResultProcessorWorker(ImpExWorkerResult result)
-
processValueLineFromWorker
protected Item processValueLineFromWorker(ValueLine line) throws ImpExException
Called fromImpExWorker
to trigger item data processing e.g. create, update or removal.- Parameters:
line
-- Returns:
- the imported item
- Throws:
ImpExException
-
isInParallelMode
protected final boolean isInParallelMode()
-
execute
protected void execute(AbstractCodeLine line, java.util.Map csvLine, boolean forEachMode) throws ImpExException
Called for each code line. If not in forEach mode this thread now waits for all workers to finish before performing beanshell code.- Overrides:
execute
in classImpExReader
- Parameters:
line
- the code linecsvLine
- current line object for context variable 'line'forEachMode
- is the reason for execution not the normal script processing, instead it is a execution reasoned by a forEach marker?- Throws:
ImpExException
- error while code execution
-
tryAllocateWorkers
protected int tryAllocateWorkers(int amount)
-
addNewWorker
protected void addNewWorker(PoolableThread poolableThread, int workerIndex)
-
createWorker
protected ImpExWorker createWorker(PoolableThread poolableThread, int number)
-
tryToBorrowThread
protected PoolableThread tryToBorrowThread()
-
tryToBorrowThread
protected PoolableThread tryToBorrowThread(ThreadPool threadPool)
-
isReaderFinished
protected boolean isReaderFinished()
-
isResultProcessorFinished
protected boolean isResultProcessorFinished()
-
isAllWorkerFinished
protected boolean isAllWorkerFinished()
-
getLogFilter
public ImpExLogFilter getLogFilter()
-
setLogFilter
public void setLogFilter(ImpExLogFilter logFilter)
-
getMaxThreads
public int getMaxThreads()
-
setMaxThreads
public void setMaxThreads(int requested)
-
getAllocatedThreads
protected int getAllocatedThreads()
- Returns:
- the allocatedThreads
-
-