Class MultiThreadedImpExImportReader

Direct Known Subclasses:
ImpExImportJob.MyMultiThreadedImpExImportReader

public class MultiThreadedImpExImportReader extends ImpExImportReader
Multi-threaded ImpEx import reader. This implementation starts multiple threads for processing lines. For reading lines a separate thread is started too. Please note that line order cannot be guaranteed now. Reader methods which rely upon line order must not be used: discardNextLine(), dumpNextLine(String) and setDumpingAllowed(boolean).
Since:
3.1-u7
  • Constructor Details

  • Method Details

    • setDumpingAllowed

      public void setDumpingAllowed(boolean dumpingAllowed)
      Dumping cannot be switched off in multi-threaded mode since line order cannot be guaranteed.
      Overrides:
      setDumpingAllowed in class ImpExImportReader
    • discardNextLine

      public void discardNextLine()
      Disallowed in parallel mode since line order cannot be guaranteed.
      Overrides:
      discardNextLine in class ImpExImportReader
    • dumpNextLine

      public void dumpNextLine(String reason)
      Disallowed in parallel mode since line order cannot be guaranteed.
      Overrides:
      dumpNextLine in class ImpExImportReader
      Parameters:
      reason - message stored with value line to dump describing reason for dumping
    • readLine

      public Object readLine() throws ImpExException
      Fetches next available result item. Please note that due to multiple threads working in background there might be more items already imported at this time. Using this class it's not possible to perform single-step importing.
      Overrides:
      readLine in class ImpExImportReader
      Returns:
      the next item which was processed
      Throws:
      ImpExException - any import/export error except UnresolvedValueException, which is handled internally
    • getInitialThreads

      protected de.hybris.platform.impex.jalo.imp.MultiThreadedImpExImportReader.InitialThreads getInitialThreads()
    • postProcessValueLine

      protected final void postProcessValueLine(ValueLine currentValueLine, Item ret, Exception error) throws ImpExException
      Now does nothing since at this time the value line is just put into queue and no result is available yet.
      Overrides:
      postProcessValueLine in class ImpExImportReader
      Throws:
      ImpExException
    • processLine

      protected final Item processLine(ValueLine valueLine) throws ImpExException
      Now enqueues the value line to be processed by one of our workers. Returns a dummy item instance - don't use it!
      Overrides:
      processLine in class ImpExImportReader
      Throws:
      ImpExException
    • writeUserRightsLines

      protected void writeUserRightsLines() throws ImpExException
      Overwritten to wait until line queue is empty.
      Overrides:
      writeUserRightsLines in class ImpExReader
      Throws:
      ImpExException - there were no user rights for import.
    • createNewHeader

      protected HeaderDescriptor createNewHeader(Map<Integer,String> line) throws HeaderValidationException
      Overwritten to block execution until all workers finished previous lines. This is necessary since header lines may contain default value expressions pointing to previous lines!
      Overrides:
      createNewHeader in class ImpExReader
      Parameters:
      line - line for which a header object is needed
      Returns:
      created header object
      Throws:
      HeaderValidationException - header is not valid
    • setCurrentHeader

      public void setCurrentHeader(HeaderDescriptor header)
      Overwritten to switch on/off parallel mode allowed flag from ImpExConstants.Syntax.Modifier#PARALLEL .
      Overrides:
      setCurrentHeader in class ImpExImportReader
      Parameters:
      header - the header which will be used from now as header for read value lines
    • readLineFromWorker

      protected boolean readLineFromWorker() throws ImpExException
      Called from ImpExReaderWorker asynchronously.
      Returns:
      true if file is not at end
      Throws:
      ImpExException
    • processPendingResult

      protected boolean processPendingResult(ImpExWorkerResult result)
    • mustMarkLineAsUnresolved

      protected boolean mustMarkLineAsUnresolved(ImpExWorkerResult result, ValueLine line)
    • postProcessValueLineInternal

      protected void postProcessValueLineInternal(ValueLine currentValueLine, AbstractProcessResult ret, Exception error) throws ImpExException
      Real post processing of lines after the worker finished importing.
      Parameters:
      currentValueLine - the value line
      ret - the import result
      error -
      Throws:
      ImpExException
    • ensureValidHeaderOrMarkUnresolved

      protected boolean ensureValidHeaderOrMarkUnresolved(ValueLine valueLine)
      Overrides:
      ensureValidHeaderOrMarkUnresolved in class ImpExImportReader
    • fetchNextValueLine

      protected ValueLine fetchNextValueLine(ImpExWorker worker)
    • fetchNextWorkerResult

      protected ImpExWorkerResult fetchNextWorkerResult()
    • addResult

      protected void addResult(ImpExWorkerResult result)
    • hasUnrecoverableError

      protected boolean hasUnrecoverableError(ImpExWorkerResult result)
    • handOffToResultProcessorWorker

      protected void handOffToResultProcessorWorker(ImpExWorkerResult result)
    • processValueLineFromWorker

      protected Item processValueLineFromWorker(ValueLine line) throws ImpExException
      Called from ImpExWorker to trigger item data processing e.g. create, update or removal.
      Parameters:
      line -
      Returns:
      the imported item
      Throws:
      ImpExException
    • isInParallelMode

      protected final boolean isInParallelMode()
    • execute

      protected void execute(AbstractCodeLine line, Map csvLine, boolean forEachMode) throws ImpExException
      Called for each code line. If not in forEach mode this thread now waits for all workers to finish before performing beanshell code.
      Overrides:
      execute in class ImpExReader
      Parameters:
      line - the code line
      csvLine - current line object for context variable 'line'
      forEachMode - is the reason for execution not the normal script processing, instead it is a execution reasoned by a forEach marker?
      Throws:
      ImpExException - error while code execution
    • tryAllocateWorkers

      protected int tryAllocateWorkers(int amount)
    • addNewWorker

      protected void addNewWorker(PoolableThread poolableThread, int workerIndex)
    • createWorker

      protected ImpExWorker createWorker(PoolableThread poolableThread, int number)
    • tryToBorrowThread

      protected PoolableThread tryToBorrowThread()
    • tryToBorrowThread

      protected PoolableThread tryToBorrowThread(ThreadPool threadPool)
    • isReaderFinished

      protected boolean isReaderFinished()
    • isResultProcessorFinished

      protected boolean isResultProcessorFinished()
    • isAllWorkerFinished

      protected boolean isAllWorkerFinished()
    • getLogFilter

      public ImpExLogFilter getLogFilter()
    • setLogFilter

      public void setLogFilter(ImpExLogFilter logFilter)
    • getMaxThreads

      public int getMaxThreads()
    • setMaxThreads

      public void setMaxThreads(int requested)
    • getAllocatedThreads

      protected int getAllocatedThreads()
      Returns:
      the allocatedThreads
    • addReadingError

      public void addReadingError(ImpExException e, ImpExReaderWorker impExReaderWorker)