2017-11-14 22 views
0

나는 여러 서비스를 가지고 있는데, 그 중 일부는 Hystrix의 HystrixObservableCommand를 사용하여 다른 서비스를 호출하고 다른 서비스는 HystrixCommand를 사용합니다. 호출 서비스의 traceIds를 HystrixObservableCommand의 Observable에 전달하는 방법은 무엇입니까? 또한 폴백이 호출되면 전달되도록 할 수 있습니까?Hystrix Observables에서 traceIds를 전달하는 방법은 무엇입니까?

모든 서비스는 grpc-java를 사용하고 있습니다. 내가 가진

샘플 코드 :

WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub); 
     String messageFromWorldService = ""; 
     String idFromWorldService = ""; 
     try { 

      Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get(); 
      messageFromWorldService = greeterReply.getMessage(); 
      idFromWorldService = greeterReply.getId(); 
      logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService); 
     } catch (StatusRuntimeException | InterruptedException | ExecutionException e) { 
      logger.warn("Exception when calling WorldService\n" + e); 
     } 

WorldCommand.java 나는 지프 킨을 사용하고

public class WorldCommand extends HystrixObservableCommand<Greeter.GreeterReply> { 

    private static final Logger logger = LoggerFactory.getLogger(WorldCommand.class.getName()); 

    private final Greeter.GreeterRequest greeterRequest; 
    private final WorldServiceGrpc.WorldServiceStub worldServiceStub; 

    public WorldCommand(Greeter.GreeterRequest greeterRequest, WorldServiceGrpc.WorldServiceStub worldServiceStub) { 
     super(HystrixCommandGroupKey.Factory.asKey("WorldService")); 
     this.greeterRequest = greeterRequest; 
     this.worldServiceStub = worldServiceStub; 
    } 

    @Override 
    protected Observable<Greeter.GreeterReply> construct() { 
     Context context = Context.current(); 
     return Observable.create(new Observable.OnSubscribe<Greeter.GreeterReply>() { 
      @Override 
      public void call(Subscriber<? super Greeter.GreeterReply> observer) { 
       logger.info("In WorldCommand"); 
       if (!observer.isUnsubscribed()) { 
        //pass on the context, if you want only certain headers to pass on then create a new Context and attach it. 
        context.attach(); 
        logger.info("In WorldCommand after attach"); 
        worldServiceStub.greetWithHelloOrWorld(greeterRequest, new StreamObserver<Greeter.GreeterReply>() { 
         @Override 
         public void onNext(Greeter.GreeterReply greeterReply) { 
          logger.info("Response from WorldService -- {}, id = {}", greeterReply.getMessage(), greeterReply.getId()); 
          observer.onNext(greeterReply); 
          observer.onCompleted(); 
         } 

         @Override 
         public void onError(Throwable t) { 
          logger.info("Exception from WorldService -- {}", t); 
         } 

         @Override 
         public void onCompleted() { 

         } 
        }); 
       } 
      } 
     }).subscribeOn(Schedulers.io()); 
    } 

    @Override 
    protected Observable<Greeter.GreeterReply> resumeWithFallback() { 
     logger.info("Response from fallback"); 
     Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("teammate").setId("-1").build(); 
     return Observable.just(greeterReply); 
    } 

추적을 grpc 및 MDCCurrentTraceContext는 로그의 traceId 및 spanId를 인쇄 할 수 있습니다.

WorldCommand의 로그 항목은 모두 추적 및 범위 ID를 인쇄하지 않으며 RxIoScheduler 스레드에서 호출됩니다.

EDIT 마이크에 의해 제안

추가 ConcurrencyStrategy.

public class CustomHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { 

    private static final Logger log = LoggerFactory.getLogger(CustomHystrixConcurrencyStrategy.class); 

    public <T> Callable<T> wrapCallable(Callable<T> callable){ 
     log.info("In CustomHystrixConcurrencyStrategy: callable="+ callable.toString()); 
     return new ContextCallable<>(callable); 
    } 
} 

HelloService는 두 개의 서비스 World 및 Team을 호출합니다. WorldCommand는 HystrixObservableCommand이고 TeamCommand는 HystrixCommand입니다.

logger.info("In the HelloService:greetWithHelloWorld"); 
Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(request.getId()).build(); 

//Call WorldService 
ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client"); 
//Async stub instead of blockingStub 
WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel); 

WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub); 
String messageFromWorldService = ""; 
String idFromWorldService = ""; 
try { 

    Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get(); 
    messageFromWorldService = greeterReply.getMessage(); 
    idFromWorldService = greeterReply.getId(); 
    logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService); 
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) { 
    logger.warn("Exception when calling WorldService\n" + e); 
} 

//Call TeamService 
ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client"); 
TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel); 
TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub); 

String messageFromTeamService = ""; 
String idFromTeamService = ""; 
try { 
    Greeter.GreeterReply greeterReply = teamCommand.construct().toBlocking().toFuture().get(); 
    messageFromTeamService = greeterReply.getMessage(); 
    idFromTeamService = greeterReply.getId(); 
    logger.info("Response from TeamService -- {}, id = {}", messageFromTeamService, idFromTeamService); 
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) { 
    logger.warn("Exception when calling TeamService\n" + e); 
} 

assert(idFromWorldService.equals(idFromTeamService)); 
Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build(); 
responseObserver.onNext(greeterReply); 
responseObserver.onCompleted(); 

PreservableContext 클래스

public class PreservableContexts { 

    //private final TraceContext traceContext; 
    private static final Logger logger = LoggerFactory.getLogger(PreservableContexts.class.getName()); 

    public PreservableContexts() { 
     logger.info("Creating new PreservableContexts"); 
     //this.traceContext = TraceContextHolder.getContext(); 
    } 

    public void set() { 
     // if (traceContext != null) { 
      //TraceContextHolder.setContext(traceContext); 
     // } 
    } 

    public void clear() { 
     //TraceContextHolder.clearContext(); 
    } 

로그 PreservableContexts 및 CustomHystrixConcurrencyStrategy 인쇄 만나지. HelloServer를 시작할 때 startegy를 등록합니다.

HystrixConcurrencyStrategy strategy = new CustomHystrixConcurrencyStrategy(); 
     HystrixPlugins.getInstance().registerConcurrencyStrategy(strategy); 
     context = HystrixRequestContext.initializeContext(); 

편집 2

이 Observables은이 설정 방법 업데이트 :

ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client"); 
    //Async stub instead of blockingStub 
    WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel); 
    WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub); 

    //Call TeamService 
    ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client"); 
    TeamServiceGrpc.TeamServiceStub teamServiceStub = TeamServiceGrpc.newStub(teamChannel); 
    //TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel); 
    TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub); 

    try { 
     rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation()); 
     rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation()); 
     Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() { 
      @Override 
      public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) { 
       String messageFromWorldService = worldReply.getMessage(); 
       String idFromWorldService = worldReply.getId(); 
       logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService); 

       String messageFromTeamService = teamReply.getMessage(); 
       String idFromTeamService = teamReply.getId(); 
       logger.info("Response from TeamService -- {}, id = {}", messageFromTeamService, idFromTeamService); 

       assert(idFromWorldService.equals(idFromTeamService)); 
       Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build(); 
       logger.info("Final response=" + greeterReply.getMessage()); 
       responseObserver.onNext(greeterReply); 
       responseObserver.onCompleted(); 
       return null; 
      } 
     }); 
    } catch (StatusRuntimeException e) { 
     logger.warn("Exception when calling WorldService and/or TeamService\n" + e); 
    } 

내가 지금 이상한 문제가 TeamCommand에 전화를하고 WorldCommand이 같이 완료되지 않습니다 코드가 실행되지 않습니다 :

Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() { 
       @Override 
       public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) { 
        String messageFromWorldService = worldReply.getMessage(); 

또한 폴백이 있으면 hystrix-timer 스레드에 MDC가 더 이상 없습니다.

답변

0

hysterix에 대한 많은 지식이 없지만 추적 ID와 같은 컨텍스트 정보를 전달하려는 경우 io.grpc.Context가 올바른 클래스입니다. traceID를 사용하여 새 컨텍스트를 만들려면 context.withValue에 전화해야합니다. 데이터가 필요한 장소에서 컨텍스트를 첨부해야합니다. 또한 완료되면 컨텍스트를 분리해야합니다. 귀하의 스 니펫에서 이러한 일이 발생하지는 않습니다. 당신은 사용할 필요가

0

...

HystrixPlugins.getInstance().registerConcurrencyStrategy(...)

가 ... 자신의 Callable을 사용하는 사용자 지정 HystrixConcurrencyStrategy을 등록 ...지프 킨 콘텍스트를 보존 할 수있는 헬퍼 클래스 비아

public class ConcurrencyStrategy extends HystrixConcurrencyStrategy {  
    @Override 
    public <K> Callable<K> wrapCallable(Callable<K> c) { 
     return new ContextCallable<>(c); 
    } 
} 

는 ... 즉 ... ... ... 상기 회로 주위

public class ContextCallable<K> implements Callable<K> { 

    private final Callable<K> callable; 
    private final PreservableContexts contexts; 

    public ContextCallable(Callable<K> actual) { 
     this.callable = actual; 
     this.contexts = new PreservableContexts(); 
    } 

    @Override 
    public K call() throws Exception { 
     contexts.set(); 
     try { 
      return callable.call(); 
     } finally { 
      contexts.clear(); 
     } 
    } 
} 

를 컨텍스트 보존 적용

public class PreservableContexts { 

    private final TraceContext traceContext; 

    public PreservableContexts() { 
     this.traceContext = TraceContextHolder.getContext(); 
    } 

    public void set() { 
     if (traceContext != null) { 
      TraceContextHolder.setContext(traceContext); 
     } 
    } 

    public void clear() { 
     TraceContextHolder.clearContext(); 
    } 

} 

... 그리고 보존하려는 다른 컨텍스트를 추가하는 쉬운 방법을 허용합니다. MDC, SecurityContext 등 ...

+0

사용자 정의 ConcurrentStrategy 클래스를 작성하고 로그를 추가하여 호출 중이라는 것을 확인했지만 로그를 인쇄하지 않습니다. 코멘트에서 정중하게 형식을 지정하기가 어렵 기 때문에 나는이 질문을 편집했다. 어떤 아이디어? 또한, HystrixConcurrenyStrategy의 Javadoc은 HystrixCommand를 사용하고 HystrixObservableCommand는 사용하지 않는다고 언급합니다. '''예를 들어, HystrixCommand에 의해 실행되는 모든 Callable은 wrapCallable (Callable)을 호출하여 사용자 정의 구현이 추가 동작으로 Callable을 장식 할 기회를 제공합니다. ''' 사용자 정의 ConcurrentStrategy가 둘 모두에 대해 작동합니까? – user2237511

+0

[스레드 격리 전략] (https://github.com/Netflix/Hystrix/wiki/How-it-Works#threads-thread-pools)을 사용한다고 가정하면 [동시성 전략] (https : // github .com/Netflix/Hystrix/wiki/Plugins # concurrencystrategy)가 적용되어야합니다 ... 세마포어 격리에 대해서는 확실하지 않지만 어쨌든 그것을 선택하지 않는 것처럼 보입니다. 방금 ​​테스트 한 결과 로컬에서 Observable 명령이 플러그인을 트리거합니다. 내가 가진 문제는 명령을 호출하는 방법입니다. 'teamCommand.construct()'대신'teamCommand.execute()'또는'teamCommand.queue()'를 사용하십시오. – Mike

+0

[동기 실행] (https://github.com/Netflix/Hystrix/wiki/How-To-Use#synchronous-execution) 및 [비동기 실행] (https://github.com/Netflix/Hystrix/)에 대한 지침 wiki/How-To-Use # asynchronous-execution)은 [How To Use] (https://github.com/Netflix/Hystrix/wiki/How-To-Use) 페이지에서 찾을 수 있습니다. – Mike