2017-10-16 19 views
1

StreamSets 데이터 수집기가 잘못된 datetime 값을 읽는 것 같습니다.Kafka에서 잘못된 시간을 읽는 StreamSets 데이터 수집기 ​​

Confluent에서 간단한 주제를 읽었습니다 : Landoop Kafka 항목으로 밀리 초 단위로 날짜/시간 값을 확인할 때 올바른 datetime을 표시하지만 StreamSet에서 Kafka Consumer 0.10.0.0으로 읽을 때 - 날짜 시간은 3 시간입니다. 그리고 20 분 더 작아. 내 실제 시간대는 GMT + 03 : 00입니다.

SDC 설정에 올바른 시간대가 있습니다. 서버 OS도 올바른 시간대를 갖습니다. SDC 기능의 now()가 올바른 결과를 제공합니다.

그 이유는 무엇입니까?

UPDATE - 내 워크 플로우

Mac 용 도커에서 서비스를 실행할 때 나는 매우 비슷한 문제가 발생했습니다
{ 
    "pipelineConfig" : { 
    "schemaVersion" : 4, 
    "version" : 7, 
    "pipelineId" : "connectorCHPGPLP2S20002datafile2d0f60f29-8175-4714-a60f-aa566e726ecb", 
    "title" : "connector_test_data_file2", 
    "description" : "", 
    "uuid" : "13fea9f9-5253-432b-8bf4-09dfbe763dcd", 
    "configuration" : [ { 
     "name" : "executionMode", 
     "value" : "STANDALONE" 
    }, { 
     "name" : "deliveryGuarantee", 
     "value" : "AT_LEAST_ONCE" 
    }, { 
     "name" : "startEventStage", 
     "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget::1" 
    }, { 
     "name" : "stopEventStage", 
     "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget::1" 
    }, { 
     "name" : "shouldRetry", 
     "value" : true 
    }, { 
     "name" : "retryAttempts", 
     "value" : -1 
    }, { 
     "name" : "memoryLimit", 
     "value" : "${jvm:maxMemoryMB() * 0.65}" 
    }, { 
     "name" : "memoryLimitExceeded", 
     "value" : "STOP_PIPELINE" 
    }, { 
     "name" : "notifyOnStates", 
     "value" : [ "RUN_ERROR", "STOPPED", "FINISHED" ] 
    }, { 
     "name" : "emailIDs", 
     "value" : [ ] 
    }, { 
     "name" : "constants", 
     "value" : [ ] 
    }, { 
     "name" : "badRecordsHandling", 
     "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_recordstolocalfilesystem_ToErrorLocalFSDTarget::1" 
    }, { 
     "name" : "errorRecordPolicy", 
     "value" : "ORIGINAL_RECORD" 
    }, { 
     "name" : "workerCount", 
     "value" : 0 
    }, { 
     "name" : "clusterSlaveMemory", 
     "value" : 1024 
    }, { 
     "name" : "clusterSlaveJavaOpts", 
     "value" : "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Dlog4j.debug" 
    }, { 
     "name" : "clusterLauncherEnv", 
     "value" : [ ] 
    }, { 
     "name" : "mesosDispatcherURL", 
     "value" : null 
    }, { 
     "name" : "hdfsS3ConfDir", 
     "value" : null 
    }, { 
     "name" : "rateLimit", 
     "value" : 0 
    }, { 
     "name" : "maxRunners", 
     "value" : 0 
    }, { 
     "name" : "webhookConfigs", 
     "value" : [ ] 
    }, { 
     "name" : "sparkConfigs", 
     "value" : [ ] 
    }, { 
     "name" : "statsAggregatorStage", 
     "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_StatsDpmDirectlyDTarget::1" 
    } ], 
    "uiInfo" : { 
     "previewConfig" : { 
     "previewSource" : "CONFIGURED_SOURCE", 
     "batchSize" : "1000", 
     "timeout" : 10000, 
     "writeToDestinations" : false, 
     "executeLifecycleEvents" : false, 
     "showHeader" : false, 
     "showFieldType" : true, 
     "rememberMe" : false 
     } 
    }, 
    "stages" : [ { 
     "instanceName" : "KafkaConsumer_01", 
     "library" : "streamsets-datacollector-apache-kafka_0_10-lib", 
     "stageName" : "com_streamsets_pipeline_stage_origin_kafka_KafkaDSource", 
     "stageVersion" : "5", 
     "configuration" : [ { 
     "name" : "kafkaConfigBean.dataFormatConfig.filePatternInArchive", 
     "value" : "*" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.charset", 
     "value" : "UTF-8" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.removeCtrlChars", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.textMaxLineLen", 
     "value" : 1024 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.useCustomDelimiter", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.customDelimiter", 
     "value" : "\\r\\n" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.includeCustomDelimiterInTheText", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.jsonContent", 
     "value" : "MULTIPLE_OBJECTS" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.jsonMaxObjectLen", 
     "value" : 4096 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvFileFormat", 
     "value" : "CSV" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvHeader", 
     "value" : "NO_HEADER" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvMaxObjectLen", 
     "value" : 1024 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvAllowExtraColumns", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvExtraColumnPrefix", 
     "value" : "_extra_" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvCustomDelimiter", 
     "value" : "|" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvCustomEscape", 
     "value" : "\\" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvCustomQuote", 
     "value" : "\"" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvEnableComments", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvCommentMarker", 
     "value" : "#" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvIgnoreEmptyLines", 
     "value" : true 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvRecordType", 
     "value" : "LIST_MAP" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.csvSkipStartLines", 
     "value" : 0 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.parseNull", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.nullConstant", 
     "value" : "\\\\N" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.xmlRecordElement", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.includeFieldXpathAttributes", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.xPathNamespaceContext", 
     "value" : [ ] 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.outputFieldAttributes", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.xmlMaxObjectLen", 
     "value" : 4096 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.logMode", 
     "value" : "COMMON_LOG_FORMAT" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.logMaxObjectLen", 
     "value" : 1024 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.retainOriginalLine", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.customLogFormat", 
     "value" : "%h %l %u %t \"%r\" %>s %b" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.regex", 
     "value" : "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\d+)" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.fieldPathsToGroupName", 
     "value" : [ { 
      "fieldPath" : "/", 
      "group" : 1 
     } ] 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.grokPatternDefinition", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.grokPattern", 
     "value" : "%{COMMONAPACHELOG}" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.onParseError", 
     "value" : "ERROR" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.maxStackTraceLines", 
     "value" : 50 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.enableLog4jCustomLogFormat", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.log4jCustomLogFormat", 
     "value" : "%r [%t] %-5p %c %x - %m%n" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.avroSchemaSource", 
     "value" : "REGISTRY" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.avroSchema", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.schemaRegistryUrls", 
     "value" : [ "http://test-kafka01.mytest.com:8081" ] 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.schemaLookupMode", 
     "value" : "AUTO" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.subject", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.schemaId", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.protoDescriptorFile", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.messageType", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.isDelimited", 
     "value" : true 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.binaryMaxObjectLen", 
     "value" : 1024 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.datagramMode", 
     "value" : "SYSLOG" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.typesDbPath", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.convertTime", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.excludeInterval", 
     "value" : true 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.authFilePath", 
     "value" : null 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.wholeFileMaxObjectLen", 
     "value" : 8192 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.rateLimit", 
     "value" : "-1" 
     }, { 
     "name" : "kafkaConfigBean.dataFormatConfig.verifyChecksum", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.dataFormat", 
     "value" : "AVRO" 
     }, { 
     "name" : "kafkaConfigBean.metadataBrokerList", 
     "value" : "test-kafka01:9092,test-kafka02:9092,test-kafka03:9092" 
     }, { 
     "name" : "kafkaConfigBean.zookeeperConnect", 
     "value" : "test-kafka01:2181,test-kafka02:2181,test-kafka03:2181" 
     }, { 
     "name" : "kafkaConfigBean.consumerGroup", 
     "value" : "connector_elastic" 
     }, { 
     "name" : "kafkaConfigBean.topic", 
     "value" : "test_data" 
     }, { 
     "name" : "kafkaConfigBean.produceSingleRecordPerMessage", 
     "value" : false 
     }, { 
     "name" : "kafkaConfigBean.maxBatchSize", 
     "value" : 1000 
     }, { 
     "name" : "kafkaConfigBean.maxWaitTime", 
     "value" : 2000 
     }, { 
     "name" : "kafkaConfigBean.maxRatePerPartition", 
     "value" : 1000 
     }, { 
     "name" : "kafkaConfigBean.keyDeserializer", 
     "value" : "CONFLUENT" 
     }, { 
     "name" : "kafkaConfigBean.valueDeserializer", 
     "value" : "CONFLUENT" 
     }, { 
     "name" : "kafkaConfigBean.kafkaConsumerConfigs", 
     "value" : [ ] 
     }, { 
     "name" : "stageOnRecordError", 
     "value" : "TO_ERROR" 
     } ], 
     "uiInfo" : { 
     "yPos" : 40.85545349121094, 
     "stageType" : "SOURCE", 
     "rawSource" : { 
      "configuration" : [ { 
      "name" : "brokerHost", 
      "value" : "localhost" 
      }, { 
      "name" : "brokerPort", 
      "value" : 9092 
      }, { 
      "name" : "topic", 
      "value" : "myTopic" 
      }, { 
      "name" : "partition", 
      "value" : 0 
      }, { 
      "name" : "maxWaitTime", 
      "value" : 1000 
      } ] 
     }, 
     "description" : "", 
     "label" : "input", 
     "xPos" : 184.59434509277344 
     }, 
     "inputLanes" : [ ], 
     "outputLanes" : [ "KafkaConsumer_01OutputLane15081523550290" ], 
     "eventLanes" : [ ] 
    }, { 
     "instanceName" : "LocalFS_01", 
     "library" : "streamsets-datacollector-basic-lib", 
     "stageName" : "com_streamsets_pipeline_stage_destination_localfilesystem_LocalFileSystemDTarget", 
     "stageVersion" : "3", 
     "configuration" : [ { 
     "name" : "configs.uniquePrefix", 
     "value" : "sdc-${sdc:id()}" 
     }, { 
     "name" : "configs.fileNameSuffix", 
     "value" : null 
     }, { 
     "name" : "configs.dirPathTemplateInHeader", 
     "value" : false 
     }, { 
     "name" : "configs.dirPathTemplate", 
     "value" : "/tmp/data_monitor/out/success/${YYYY()}-${MM()}-${DD()}-${hh()}" 
     }, { 
     "name" : "configs.timeZoneID", 
     "value" : "Europe/Moscow" 
     }, { 
     "name" : "configs.timeDriver", 
     "value" : "${time:now()}" 
     }, { 
     "name" : "configs.maxRecordsPerFile", 
     "value" : 0 
     }, { 
     "name" : "configs.maxFileSize", 
     "value" : 0 
     }, { 
     "name" : "configs.idleTimeout", 
     "value" : "${1 * HOURS}" 
     }, { 
     "name" : "configs.compression", 
     "value" : "NONE" 
     }, { 
     "name" : "configs.otherCompression", 
     "value" : null 
     }, { 
     "name" : "configs.fileType", 
     "value" : "TEXT" 
     }, { 
     "name" : "configs.keyEl", 
     "value" : "${uuid()}" 
     }, { 
     "name" : "configs.lateRecordsLimit", 
     "value" : "${1 * HOURS}" 
     }, { 
     "name" : "configs.rollIfHeader", 
     "value" : false 
     }, { 
     "name" : "configs.rollHeaderName", 
     "value" : "roll" 
     }, { 
     "name" : "configs.lateRecordsAction", 
     "value" : "SEND_TO_ERROR" 
     }, { 
     "name" : "configs.lateRecordsDirPathTemplate", 
     "value" : "/tmp/late/${YYYY()}-${MM()}-${DD()}" 
     }, { 
     "name" : "configs.dataFormat", 
     "value" : "JSON" 
     }, { 
     "name" : "configs.hdfsPermissionCheck", 
     "value" : true 
     }, { 
     "name" : "configs.permissionEL", 
     "value" : null 
     }, { 
     "name" : "configs.skipOldTempFileRecovery", 
     "value" : false 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.charset", 
     "value" : "UTF-8" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvFileFormat", 
     "value" : "CSV" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvHeader", 
     "value" : "NO_HEADER" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvReplaceNewLines", 
     "value" : true 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvReplaceNewLinesString", 
     "value" : " " 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvCustomDelimiter", 
     "value" : "|" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvCustomEscape", 
     "value" : "\\" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.csvCustomQuote", 
     "value" : "\"" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.jsonMode", 
     "value" : "MULTIPLE_OBJECTS" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.textFieldPath", 
     "value" : "/text" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.textRecordSeparator", 
     "value" : "\\n" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.textFieldMissingAction", 
     "value" : "ERROR" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.textEmptyLineIfNull", 
     "value" : false 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.avroSchemaSource", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.avroSchema", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.registerSchema", 
     "value" : false 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.schemaRegistryUrlsForRegistration", 
     "value" : [ ] 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.schemaRegistryUrls", 
     "value" : [ ] 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.schemaLookupMode", 
     "value" : "SUBJECT" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.subject", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.subjectToRegister", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.schemaId", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.avroCompression", 
     "value" : "NULL" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.binaryFieldPath", 
     "value" : "/" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.protoDescriptorFile", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.messageType", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.fileNameEL", 
     "value" : null 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.wholeFileExistsAction", 
     "value" : "TO_ERROR" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.includeChecksumInTheEvents", 
     "value" : false 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.checksumAlgorithm", 
     "value" : "MD5" 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.xmlPrettyPrint", 
     "value" : true 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.xmlValidateSchema", 
     "value" : false 
     }, { 
     "name" : "configs.dataGeneratorFormatConfig.xmlSchema", 
     "value" : null 
     }, { 
     "name" : "stageOnRecordError", 
     "value" : "TO_ERROR" 
     }, { 
     "name" : "stageRequiredFields", 
     "value" : [ ] 
     }, { 
     "name" : "stageRecordPreconditions", 
     "value" : [ ] 
     } ], 
     "uiInfo" : { 
     "description" : "", 
     "label" : "local_sink", 
     "xPos" : 471.2611083984375, 
     "yPos" : 44.572269439697266, 
     "stageType" : "TARGET" 
     }, 
     "inputLanes" : [ "KafkaConsumer_01OutputLane15081523550290" ], 
     "outputLanes" : [ ], 
     "eventLanes" : [ ] 
    } ], 
    "errorStage" : { 
     "instanceName" : "WritetoFile_ErrorStage", 
     "library" : "streamsets-datacollector-basic-lib", 
     "stageName" : "com_streamsets_pipeline_stage_destination_recordstolocalfilesystem_ToErrorLocalFSDTarget", 
     "stageVersion" : "1", 
     "configuration" : [ { 
     "name" : "directory", 
     "value" : "/tmp/data_monitor/out/error" 
     }, { 
     "name" : "uniquePrefix", 
     "value" : "sdc-${sdc:id()}" 
     }, { 
     "name" : "rotationIntervalSecs", 
     "value" : "${1 * HOURS}" 
     }, { 
     "name" : "maxFileSizeMbs", 
     "value" : 512 
     } ], 
     "uiInfo" : { 
     "description" : "", 
     "label" : "Error Records - Write to File", 
     "xPos" : 622, 
     "yPos" : 50, 
     "stageType" : "TARGET" 
     }, 
     "inputLanes" : [ ], 
     "outputLanes" : [ ], 
     "eventLanes" : [ ] 
    }, 
    "info" : { 
     "pipelineId" : "connectorCHPGPLP2S20002datafile2d0f60f29-8175-4714-a60f-aa566e726ecb", 
     "title" : "connector_test_data_file2", 
     "description" : "", 
     "created" : 1508175563039, 
     "lastModified" : 1508177203011, 
     "creator" : "admin", 
     "lastModifier" : "admin", 
     "lastRev" : "0", 
     "uuid" : "13fea9f9-5253-432b-8bf4-09dfbe763dcd", 
     "valid" : true, 
     "metadata" : { 
     "labels" : [ ] 
     }, 
     "name" : "connectorCHPGPLP2S20002datafile2d0f60f29-8175-4714-a60f-aa566e726ecb", 
     "sdcVersion" : "2.7.1.1", 
     "sdcId" : "a2869998-a9c9-11e7-94f2-31416694a1b6" 
    }, 
    "metadata" : { 
     "labels" : [ ] 
    }, 
    "statsAggregatorStage" : { 
     "instanceName" : "WritetoDPMdirectly_StatsAggregatorStage", 
     "library" : "streamsets-datacollector-basic-lib", 
     "stageName" : "com_streamsets_pipeline_stage_destination_devnull_StatsDpmDirectlyDTarget", 
     "stageVersion" : "1", 
     "configuration" : [ ], 
     "uiInfo" : { 
     "description" : "", 
     "label" : "Stats Aggregator - Write to DPM directly", 
     "xPos" : 280, 
     "yPos" : 50, 
     "stageType" : "TARGET" 
     }, 
     "inputLanes" : [ ], 
     "outputLanes" : [ ], 
     "eventLanes" : [ ] 
    }, 
    "startEventStages" : [ { 
     "instanceName" : "Discard_StartEventStage", 
     "library" : "streamsets-datacollector-basic-lib", 
     "stageName" : "com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget", 
     "stageVersion" : "1", 
     "configuration" : [ ], 
     "uiInfo" : { 
     "description" : "", 
     "label" : "Start Event - Discard", 
     "xPos" : 280, 
     "yPos" : 50, 
     "stageType" : "TARGET" 
     }, 
     "inputLanes" : [ ], 
     "outputLanes" : [ ], 
     "eventLanes" : [ ] 
    } ], 
    "stopEventStages" : [ { 
     "instanceName" : "Discard_StopEventStage", 
     "library" : "streamsets-datacollector-basic-lib", 
     "stageName" : "com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget", 
     "stageVersion" : "1", 
     "configuration" : [ ], 
     "uiInfo" : { 
     "description" : "", 
     "label" : "Stop Event - Discard", 
     "xPos" : 280, 
     "yPos" : 50, 
     "stageType" : "TARGET" 
     }, 
     "inputLanes" : [ ], 
     "outputLanes" : [ ], 
     "eventLanes" : [ ] 
    } ], 
    "valid" : true, 
    "issues" : { 
     "stageIssues" : { }, 
     "pipelineIssues" : [ ], 
     "issueCount" : 0 
    }, 
    "previewable" : true 
    }, 
    "pipelineRules" : { 
    "schemaVersion" : 3, 
    "version" : 2, 
    "metricsRuleDefinitions" : [ { 
     "id" : "badRecordsAlertID", 
     "alertText" : "High incidence of Error Records", 
     "metricId" : "pipeline.batchErrorRecords.counter", 
     "metricType" : "COUNTER", 
     "metricElement" : "COUNTER_COUNT", 
     "condition" : "${value() > 100}", 
     "sendEmail" : false, 
     "enabled" : false, 
     "timestamp" : 1508152340733, 
     "valid" : true 
    }, { 
     "id" : "stageErrorAlertID", 
     "alertText" : "High incidence of Stage Errors", 
     "metricId" : "pipeline.batchErrorMessages.counter", 
     "metricType" : "COUNTER", 
     "metricElement" : "COUNTER_COUNT", 
     "condition" : "${value() > 100}", 
     "sendEmail" : false, 
     "enabled" : false, 
     "timestamp" : 1508152340733, 
     "valid" : true 
    }, { 
     "id" : "idleGaugeID", 
     "alertText" : "Pipeline is Idle", 
     "metricId" : "RuntimeStatsGauge.gauge", 
     "metricType" : "GAUGE", 
     "metricElement" : "TIME_OF_LAST_RECEIVED_RECORD", 
     "condition" : "${time:now() - value() > 120000}", 
     "sendEmail" : false, 
     "enabled" : false, 
     "timestamp" : 1508152340733, 
     "valid" : true 
    }, { 
     "id" : "batchTimeAlertID", 
     "alertText" : "Batch taking more time to process", 
     "metricId" : "RuntimeStatsGauge.gauge", 
     "metricType" : "GAUGE", 
     "metricElement" : "CURRENT_BATCH_AGE", 
     "condition" : "${value() > 200}", 
     "sendEmail" : false, 
     "enabled" : false, 
     "timestamp" : 1508152340733, 
     "valid" : true 
    }, { 
     "id" : "memoryLimitAlertID", 
     "alertText" : "Memory limit for pipeline exceeded", 
     "metricId" : "pipeline.memoryConsumed.counter", 
     "metricType" : "COUNTER", 
     "metricElement" : "COUNTER_COUNT", 
     "condition" : "${value() > (jvm:maxMemoryMB() * 0.65)}", 
     "sendEmail" : false, 
     "enabled" : false, 
     "timestamp" : 1508152340733, 
     "valid" : true 
    } ], 
    "dataRuleDefinitions" : [ ], 
    "driftRuleDefinitions" : [ ], 
    "uuid" : "6ab2f9cf-0043-48e7-8e5f-1a3d2ec86d4d", 
    "configuration" : [ { 
     "name" : "emailIDs", 
     "value" : [ ] 
    }, { 
     "name" : "webhookConfigs", 
     "value" : [ ] 
    } ], 
    "configIssues" : [ ], 
    "ruleIssues" : [ ] 
    }, 
    "libraryDefinitions" : null 
} 
+0

코드를 표시하십시오! – Clonkex

+0

왜 이것이 될지 모르지만 ... StreamSets Google 그룹, 슬랙 채널 및 전용 Q & A 사이트가 있습니다. https://streamsets.com/community/를 참조하십시오. 커뮤니티와 직접 상호 작용할 수 있습니다. – metadaddy

+0

저는 코드를 사용하지 않고 비주얼 에디터 만 사용합니다 - 카프카 소비자가 "origin"하고 "destination"로컬 FS가 생겼습니다. –

답변

1

기록을 위해 우리는 이것을 다른 곳에서 알아 냈습니다. Java date patterns과 관련이 있습니다.

2017-10-20 13:15:33.611796 같이 표현식이 ${time:extractDateFromString(record:value("/ts"),"yyyy-MM-dd hh:mm:ss.SSSSSS")} 인 문자열을 밀리 초로 구문 분석하는 것은 예상대로 작동하지 않습니다. S은 밀리 초를 의미하므로 611,796 마이크로 초는 611,796 밀리 초로 해석되고 611 초 (10 분 조금 넘는 시간)가 시간에 더해져 값이 2017-10-20 01:25:44.796 인 Date 객체가 생성됩니다.

수신 문자열에서 마이크로 초를 자르기 (예 : ${time:extractDateFromString(str:substring(record:value("/ts"), 0, 23),"yyyy-MM-dd hh:mm:ss.SSS")}). 필요한 경우 마이크로 초를 캡처 할 수 있습니다. ${str:substring(record:value("/ts"), 23, 26)}

0

를 수출했다. VM의 시스템 시간이 호스트 시스템의 시스템 시간에서 벗어나고 Docker를 다시 시작하여 해결할 수 있다는 오랜 알려진 문제점이 있습니다. 이게 너에게 영향을 미칠 가능성은 있니?

+0

CentOS 서버에서 스트림 세트를 실행합니다. 다른 하나는 Kafka, CentOS에서는 스트림 세트를 실행합니다. 도커는 사용되지 않습니다. 나는 이것이 나의 경우가 아니라고 생각한다. –