1

게시 하위 주제에 로그 내보내기를 사용하도록 설정했습니다. 데이터 흐름을 사용하여 이러한 로그를 처리하고 관련 열을 BigQuery에 저장합니다. 누군가가 pubsub 메시지 페이로드를 LogEntry 개체로 변환하는 것을 도울 수 있습니까? 다음 코드 시도 :로그 내보내기에서 pubsub 페이로드를 LogEntry 개체로 변환하는 방법

@ProcessElement 
public void processElement(ProcessContext c) throws Exception { 
    PubsubMessage pubsubMessage = c.element(); 

    ObjectMapper mapper = new ObjectMapper(); 

    byte[] payload = pubsubMessage.getPayload(); 
    String s = new String(payload, "UTF8"); 
    LogEntry logEntry = mapper.readValue(s, LogEntry.class); 
} 

을하지만, 나는 다음과 같은 오류가 발생했습니다 :

com.fasterxml.jackson.databind.JsonMappingException: Can not find a (Map) Key deserializer for type [simple type, class com.google.protobuf.Descriptors$FieldDescriptor] 

편집 : 나는 다음과 같은 코드를 시도 :

try { 
     ByteArrayInputStream stream = new ByteArrayInputStream(Base64.decodeBase64(pubsubMessage.getPayload())); 
     LogEntry logEntry = LogEntry.parseDelimitedFrom(stream); 
     System.out.println("Log Entry = " + logEntry); 
    } catch (InvalidProtocolBufferException e) { 
     e.printStackTrace(); 
    } 

을하지만 난 다음 얻을 오류 :

com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag

답변

0

JSON format parser이 가능해야합니다. 자바는 나의 힘이 아니다,하지만 난 당신이 뭔가를 찾고 생각 :

@ProcessElement 
public void processElement(ProcessContext c) throws Exception { 
    LogEntry.Builder entryBuilder = LogEntry.newBuilder(); 
    JsonFormat.Parser.usingTypeRegistry(
     JsonFormat.TypeRegistry.newBuilder() 
      .add(LogEntry.getDescriptor()) 
      .build()) 
     .ignoringUnknownFields() 
     .merge(c.element(), entryBuilder); 
    LogEntry entry = entryBuilder.build(); 
    ... 
} 

당신은 유형을 회원 가입없이 멀리 얻을 수 있습니다. C++에서는 프로토 타입이 글로벌 레지스트리에 링크되어 있다고 생각합니다.

서비스가 새 필드를 추가하고 내보내고 proto 설명 자의 복사본을 업데이트하지 않은 경우 "ignoringUnknownFields"를 원할 것입니다. 내 보낸 JSON의 "@type"필드도 문제를 일으킬 수 있습니다.

페이로드의 특수 처리가 필요할 수 있습니다 (예 : JSON에서 제거한 다음 별도로 구문 분석). JSON이라면 파서가 존재하지 않는 하위 메시지를 채우기를 기대합니다. proto 인 경우 ... Any 유형도 등록하면 실제로 작동 할 수 있습니다.