Class MultiThreadedImpExImportReader
- java.lang.Object
-
- de.hybris.platform.impex.jalo.ImpExReader
-
- de.hybris.platform.impex.jalo.imp.ImpExImportReader
-
- de.hybris.platform.impex.jalo.imp.MultiThreadedImpExImportReader
-
- Direct Known Subclasses:
ImpExImportJob.MyMultiThreadedImpExImportReader
public class MultiThreadedImpExImportReader extends ImpExImportReader
Multi-threaded ImpEx import reader. This implementation starts multiple threads for processing lines. For reading lines a separate thread is started too. Please note that line order cannot be guaranteed now. Reader methods which rely upon line order must not be used:discardNextLine(),dumpNextLine(String)andsetDumpingAllowed(boolean).- Since:
- 3.1-u7
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class de.hybris.platform.impex.jalo.ImpExReader
ImpExReader.ResultSetCSVReader
-
-
Field Summary
-
Fields inherited from class de.hybris.platform.impex.jalo.ImpExReader
FIRST
-
-
Constructor Summary
Constructors Constructor Description MultiThreadedImpExImportReader(CSVReader reader)MultiThreadedImpExImportReader(CSVReader reader, boolean legacyMode)MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter)MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, DocumentIDRegistry docIDRegistry, MultiThreadedImportProcessor processor, EnumerationValue mode)MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, MultiThreadedImportProcessor processor)MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, MultiThreadedImportProcessor processor, boolean legacyMode)MultiThreadedImpExImportReader(java.lang.String lines)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected voidaddNewWorker(PoolableThread poolableThread, int workerIndex)protected voidaddResult(ImpExWorkerResult result)protected HeaderDescriptorcreateNewHeader(java.util.Map<java.lang.Integer,java.lang.String> line)Overwritten to block execution until all workers finished previous lines.protected ImpExWorkercreateWorker(PoolableThread poolableThread, int number)voiddiscardNextLine()Disallowed in parallel mode since line order cannot be guaranteed.voiddumpNextLine(java.lang.String reason)Disallowed in parallel mode since line order cannot be guaranteed.protected booleanensureValidHeaderOrMarkUnresolved(ValueLine valueLine)protected voidexecute(AbstractCodeLine line, java.util.Map csvLine, boolean forEachMode)Called for each code line.protected ValueLinefetchNextValueLine(ImpExWorker worker)protected ImpExWorkerResultfetchNextWorkerResult()protected intgetAllocatedThreads()protected de.hybris.platform.impex.jalo.imp.MultiThreadedImpExImportReader.InitialThreadsgetInitialThreads()ImpExLogFiltergetLogFilter()intgetMaxThreads()protected voidhandOffToResultProcessorWorker(ImpExWorkerResult result)protected booleanhasUnrecoverableError(ImpExWorkerResult result)protected booleanisAllWorkerFinished()protected booleanisInParallelMode()protected booleanisReaderFinished()protected booleanisResultProcessorFinished()protected booleanmustMarkLineAsUnresolved(ImpExWorkerResult result, ValueLine line)protected voidpostProcessValueLine(ValueLine currentValueLine, Item ret, java.lang.Exception error)Now does nothing since at this time the value line is just put into queue and no result is available yet.protected voidpostProcessValueLineInternal(ValueLine currentValueLine, AbstractProcessResult ret, java.lang.Exception error)Real post processing of lines after the worker finished importing.protected ItemprocessLine(ValueLine valueLine)Now enqueues the value line to be processed by one of our workers.protected booleanprocessPendingResult(ImpExWorkerResult result)protected ItemprocessValueLineFromWorker(ValueLine line)Called fromImpExWorkerto trigger item data processing e.g.java.lang.ObjectreadLine()Fetches next available result item.protected booleanreadLineFromWorker()Called fromImpExReaderWorkerasynchronously.voidsetCurrentHeader(HeaderDescriptor header)Overwritten to switch on/off parallel mode allowed flag fromImpExConstants.Syntax.Modifier#PARALLEL.voidsetDumpingAllowed(boolean dumpingAllowed)Dumping cannot be switched off in multi-threaded mode since line order cannot be guaranteed.voidsetLogFilter(ImpExLogFilter logFilter)voidsetMaxThreads(int requested)protected inttryAllocateWorkers(int amount)protected PoolableThreadtryToBorrowThread()protected PoolableThreadtryToBorrowThread(ThreadPool threadPool)protected voidwriteUserRightsLines()Overwritten to wait until line queue is empty.-
Methods inherited from class de.hybris.platform.impex.jalo.imp.ImpExImportReader
close, createValueLine, dumpNextLine, dumpUnresolvedLine, findMarker, getAfterEachCode, getCSVWriter, getCurrentValueLine, getCustomImportProcessor, getDumpedLineCount, getDumpedLineCountPerHeader, getForEachCode, getImportProcessor, getLastImportedItem, getLastImportedItemLineNumber, getLastImportedLine, getProcessedItemsCount, getProcessedItemsCountPerHeader, getResolvedItemsCount, getScriptExecutionContext, getValidationMode, getValueLineCount, incDumpCount, incItemsCount, isCreateHMCSavedValues, isDumpingAllowed, isLegacyMode, isSecondPass, preProcessLine, processMarkerCodeLine, readAll, setAfterEachCode, setBeanShellContext, setCreateHMCSavedValues, setCSVWriter, setForEachCode, setIsSecondPass, setValueLinesToSkip
-
Methods inherited from class de.hybris.platform.impex.jalo.ImpExReader
addDefinition, addExternalDataMedia, addExternalDataMedias, addHeaderExceptionInfoAsComment, addHeaderReplacementRule, addToBeanShellContext, adjustLineIndexes, applyHeaderReplacements, assureBeanShellLoaded, checkDefinitonKey, createCodeLine, createCodeLineLegacyWay, createCodeLineModernWay, createInvalidHeader, debug, debug, enableCodeExecution, enableExternalCodeExecution, enableExternalDataCodeExecution, enableExternalImpExSyntaxParsing, enableExternalSyntaxParsing, enterIfBlock, error, error, execute, exitIfBlock, findExternalDataMedia, findExternalDataMedia, getAllExternalDataMediaCodes, getAttributeConstraintFilter, getBeforeEachCode, getCellDecorators, getCSVReader, getCurrentHeader, getCurrentLocation, getCurrentReader, getDefaultReplacements, getDocumentIDRegistry, getFromBeanShellContext, getInvalidHeaderPolicy, getLineSize, getLocale, getStrictMode, gotInsertedLines, hasCellDecorators, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeSQLData, includeSQLData, includeSQLData, info, info, initDatabase, insertLine, isCodeExecutionEnabled, isCodeLine, isCombinedSearchEnabled, isCommentLine, isDebugEnabled, isDefinition, isEmptyLine, isEndUserRights, isExternalCodeExecutionEnabled, isExternalSyntaxParsingEnabled, isHeaderLine, isIncludingExternalData, isInfoEnabled, isNotInInactiveIfBlock, isStartUserRights, legacyExecute, lineToList, modernExecute, parseHeader, parseHeader, parseHeader, parseHeader, processCodeLine, readNextCSVLine, removeExternalDataMedia, removeExternalDataMedias, replaceDefinitions, setAttributeConstraintFilter, setBeforeEachCode, setCellDecorators, setCurrentHeader, setInvalidHeaderPolicy, setLocale, setRelaxedMode, setRelaxedMode, setValidationMode, setValidationMode, sortRulesList, splitDefinitonCell, storeUserRightsLine, warn, warn
-
-
-
-
Constructor Detail
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(java.lang.String lines)
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader)
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader, boolean legacyMode)
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter)
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, MultiThreadedImportProcessor processor)
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, MultiThreadedImportProcessor processor, boolean legacyMode)
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, DocumentIDRegistry docIDRegistry, MultiThreadedImportProcessor processor, EnumerationValue mode)
-
-
Method Detail
-
setDumpingAllowed
public void setDumpingAllowed(boolean dumpingAllowed)
Dumping cannot be switched off in multi-threaded mode since line order cannot be guaranteed.- Overrides:
setDumpingAllowedin classImpExImportReader
-
discardNextLine
public void discardNextLine()
Disallowed in parallel mode since line order cannot be guaranteed.- Overrides:
discardNextLinein classImpExImportReader
-
dumpNextLine
public void dumpNextLine(java.lang.String reason)
Disallowed in parallel mode since line order cannot be guaranteed.- Overrides:
dumpNextLinein classImpExImportReader- Parameters:
reason- message stored with value line to dump describing reason for dumping
-
readLine
public java.lang.Object readLine() throws ImpExExceptionFetches next available result item. Please note that due to multiple threads working in background there might be more items already imported at this time. Using this class it's not possible to perform single-step importing.- Overrides:
readLinein classImpExImportReader- Returns:
- the next item which was processed
- Throws:
ImpExException- any import/export error exceptUnresolvedValueException, which is handled internally
-
getInitialThreads
protected de.hybris.platform.impex.jalo.imp.MultiThreadedImpExImportReader.InitialThreads getInitialThreads()
-
postProcessValueLine
protected final void postProcessValueLine(ValueLine currentValueLine, Item ret, java.lang.Exception error) throws ImpExException
Now does nothing since at this time the value line is just put into queue and no result is available yet.- Overrides:
postProcessValueLinein classImpExImportReader- Throws:
ImpExException
-
processLine
protected final Item processLine(ValueLine valueLine) throws ImpExException
Now enqueues the value line to be processed by one of our workers. Returns adummyitem instance - don't use it!- Overrides:
processLinein classImpExImportReader- Throws:
ImpExException
-
writeUserRightsLines
protected void writeUserRightsLines() throws ImpExExceptionOverwritten to wait until line queue is empty.- Overrides:
writeUserRightsLinesin classImpExReader- Throws:
ImpExException- there were no user rights for import.
-
createNewHeader
protected HeaderDescriptor createNewHeader(java.util.Map<java.lang.Integer,java.lang.String> line) throws HeaderValidationException
Overwritten to block execution until all workers finished previous lines. This is necessary since header lines may contain default value expressions pointing to previous lines!- Overrides:
createNewHeaderin classImpExReader- Parameters:
line- line for which a header object is needed- Returns:
- created header object
- Throws:
HeaderValidationException- header is not valid
-
setCurrentHeader
public void setCurrentHeader(HeaderDescriptor header)
Overwritten to switch on/off parallel mode allowed flag fromImpExConstants.Syntax.Modifier#PARALLEL.- Overrides:
setCurrentHeaderin classImpExImportReader- Parameters:
header- the header which will be used from now as header for read value lines
-
readLineFromWorker
protected boolean readLineFromWorker() throws ImpExExceptionCalled fromImpExReaderWorkerasynchronously.- Returns:
- true if file is not at end
- Throws:
ImpExException
-
processPendingResult
protected boolean processPendingResult(ImpExWorkerResult result)
-
mustMarkLineAsUnresolved
protected boolean mustMarkLineAsUnresolved(ImpExWorkerResult result, ValueLine line)
-
postProcessValueLineInternal
protected void postProcessValueLineInternal(ValueLine currentValueLine, AbstractProcessResult ret, java.lang.Exception error) throws ImpExException
Real post processing of lines after the worker finished importing.- Parameters:
currentValueLine- the value lineret- the import resulterror-- Throws:
ImpExException
-
ensureValidHeaderOrMarkUnresolved
protected boolean ensureValidHeaderOrMarkUnresolved(ValueLine valueLine)
- Overrides:
ensureValidHeaderOrMarkUnresolvedin classImpExImportReader
-
fetchNextValueLine
protected ValueLine fetchNextValueLine(ImpExWorker worker)
-
fetchNextWorkerResult
protected ImpExWorkerResult fetchNextWorkerResult()
-
addResult
protected void addResult(ImpExWorkerResult result)
-
hasUnrecoverableError
protected boolean hasUnrecoverableError(ImpExWorkerResult result)
-
handOffToResultProcessorWorker
protected void handOffToResultProcessorWorker(ImpExWorkerResult result)
-
processValueLineFromWorker
protected Item processValueLineFromWorker(ValueLine line) throws ImpExException
Called fromImpExWorkerto trigger item data processing e.g. create, update or removal.- Parameters:
line-- Returns:
- the imported item
- Throws:
ImpExException
-
isInParallelMode
protected final boolean isInParallelMode()
-
execute
protected void execute(AbstractCodeLine line, java.util.Map csvLine, boolean forEachMode) throws ImpExException
Called for each code line. If not in forEach mode this thread now waits for all workers to finish before performing beanshell code.- Overrides:
executein classImpExReader- Parameters:
line- the code linecsvLine- current line object for context variable 'line'forEachMode- is the reason for execution not the normal script processing, instead it is a execution reasoned by a forEach marker?- Throws:
ImpExException- error while code execution
-
tryAllocateWorkers
protected int tryAllocateWorkers(int amount)
-
addNewWorker
protected void addNewWorker(PoolableThread poolableThread, int workerIndex)
-
createWorker
protected ImpExWorker createWorker(PoolableThread poolableThread, int number)
-
tryToBorrowThread
protected PoolableThread tryToBorrowThread()
-
tryToBorrowThread
protected PoolableThread tryToBorrowThread(ThreadPool threadPool)
-
isReaderFinished
protected boolean isReaderFinished()
-
isResultProcessorFinished
protected boolean isResultProcessorFinished()
-
isAllWorkerFinished
protected boolean isAllWorkerFinished()
-
getLogFilter
public ImpExLogFilter getLogFilter()
-
setLogFilter
public void setLogFilter(ImpExLogFilter logFilter)
-
getMaxThreads
public int getMaxThreads()
-
setMaxThreads
public void setMaxThreads(int requested)
-
getAllocatedThreads
protected int getAllocatedThreads()
- Returns:
- the allocatedThreads
-
-