나는 여러 서비스를 가지고 있는데, 그 중 일부는 Hystrix의 HystrixObservableCommand를 사용하여 다른 서비스를 호출하고 다른 서비스는 HystrixCommand를 사용합니다. 호출 서비스의 traceIds를 HystrixObservableCommand의 Observable에 전달하는 방법은 무엇입니까? 또한 폴백이 호출되면 전달되도록 할 수 있습니까?Hystrix Observables에서 traceIds를 전달하는 방법은 무엇입니까?
모든 서비스는 grpc-java를 사용하고 있습니다. 내가 가진
샘플 코드 :
WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub);
String messageFromWorldService = "";
String idFromWorldService = "";
try {
Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get();
messageFromWorldService = greeterReply.getMessage();
idFromWorldService = greeterReply.getId();
logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService);
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) {
logger.warn("Exception when calling WorldService\n" + e);
}
WorldCommand.java 나는 지프 킨을 사용하고
public class WorldCommand extends HystrixObservableCommand<Greeter.GreeterReply> {
private static final Logger logger = LoggerFactory.getLogger(WorldCommand.class.getName());
private final Greeter.GreeterRequest greeterRequest;
private final WorldServiceGrpc.WorldServiceStub worldServiceStub;
public WorldCommand(Greeter.GreeterRequest greeterRequest, WorldServiceGrpc.WorldServiceStub worldServiceStub) {
super(HystrixCommandGroupKey.Factory.asKey("WorldService"));
this.greeterRequest = greeterRequest;
this.worldServiceStub = worldServiceStub;
}
@Override
protected Observable<Greeter.GreeterReply> construct() {
Context context = Context.current();
return Observable.create(new Observable.OnSubscribe<Greeter.GreeterReply>() {
@Override
public void call(Subscriber<? super Greeter.GreeterReply> observer) {
logger.info("In WorldCommand");
if (!observer.isUnsubscribed()) {
//pass on the context, if you want only certain headers to pass on then create a new Context and attach it.
context.attach();
logger.info("In WorldCommand after attach");
worldServiceStub.greetWithHelloOrWorld(greeterRequest, new StreamObserver<Greeter.GreeterReply>() {
@Override
public void onNext(Greeter.GreeterReply greeterReply) {
logger.info("Response from WorldService -- {}, id = {}", greeterReply.getMessage(), greeterReply.getId());
observer.onNext(greeterReply);
observer.onCompleted();
}
@Override
public void onError(Throwable t) {
logger.info("Exception from WorldService -- {}", t);
}
@Override
public void onCompleted() {
}
});
}
}
}).subscribeOn(Schedulers.io());
}
@Override
protected Observable<Greeter.GreeterReply> resumeWithFallback() {
logger.info("Response from fallback");
Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("teammate").setId("-1").build();
return Observable.just(greeterReply);
}
추적을 grpc 및 MDCCurrentTraceContext는 로그의 traceId 및 spanId를 인쇄 할 수 있습니다.
WorldCommand의 로그 항목은 모두 추적 및 범위 ID를 인쇄하지 않으며 RxIoScheduler 스레드에서 호출됩니다.
EDIT 마이크에 의해 제안
추가 ConcurrencyStrategy.
public class CustomHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private static final Logger log = LoggerFactory.getLogger(CustomHystrixConcurrencyStrategy.class);
public <T> Callable<T> wrapCallable(Callable<T> callable){
log.info("In CustomHystrixConcurrencyStrategy: callable="+ callable.toString());
return new ContextCallable<>(callable);
}
}
HelloService는 두 개의 서비스 World 및 Team을 호출합니다. WorldCommand는 HystrixObservableCommand이고 TeamCommand는 HystrixCommand입니다.
logger.info("In the HelloService:greetWithHelloWorld");
Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(request.getId()).build();
//Call WorldService
ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client");
//Async stub instead of blockingStub
WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel);
WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub);
String messageFromWorldService = "";
String idFromWorldService = "";
try {
Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get();
messageFromWorldService = greeterReply.getMessage();
idFromWorldService = greeterReply.getId();
logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService);
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) {
logger.warn("Exception when calling WorldService\n" + e);
}
//Call TeamService
ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client");
TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel);
TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub);
String messageFromTeamService = "";
String idFromTeamService = "";
try {
Greeter.GreeterReply greeterReply = teamCommand.construct().toBlocking().toFuture().get();
messageFromTeamService = greeterReply.getMessage();
idFromTeamService = greeterReply.getId();
logger.info("Response from TeamService -- {}, id = {}", messageFromTeamService, idFromTeamService);
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) {
logger.warn("Exception when calling TeamService\n" + e);
}
assert(idFromWorldService.equals(idFromTeamService));
Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build();
responseObserver.onNext(greeterReply);
responseObserver.onCompleted();
PreservableContext 클래스
public class PreservableContexts {
//private final TraceContext traceContext;
private static final Logger logger = LoggerFactory.getLogger(PreservableContexts.class.getName());
public PreservableContexts() {
logger.info("Creating new PreservableContexts");
//this.traceContext = TraceContextHolder.getContext();
}
public void set() {
// if (traceContext != null) {
//TraceContextHolder.setContext(traceContext);
// }
}
public void clear() {
//TraceContextHolder.clearContext();
}
로그 PreservableContexts 및 CustomHystrixConcurrencyStrategy 인쇄 만나지. HelloServer를 시작할 때 startegy를 등록합니다.
HystrixConcurrencyStrategy strategy = new CustomHystrixConcurrencyStrategy();
HystrixPlugins.getInstance().registerConcurrencyStrategy(strategy);
context = HystrixRequestContext.initializeContext();
편집 2
이 Observables은이 설정 방법 업데이트 :
ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client");
//Async stub instead of blockingStub
WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel);
WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub);
//Call TeamService
ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client");
TeamServiceGrpc.TeamServiceStub teamServiceStub = TeamServiceGrpc.newStub(teamChannel);
//TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel);
TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub);
try {
rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation());
rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation());
Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() {
@Override
public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) {
String messageFromWorldService = worldReply.getMessage();
String idFromWorldService = worldReply.getId();
logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService);
String messageFromTeamService = teamReply.getMessage();
String idFromTeamService = teamReply.getId();
logger.info("Response from TeamService -- {}, id = {}", messageFromTeamService, idFromTeamService);
assert(idFromWorldService.equals(idFromTeamService));
Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build();
logger.info("Final response=" + greeterReply.getMessage());
responseObserver.onNext(greeterReply);
responseObserver.onCompleted();
return null;
}
});
} catch (StatusRuntimeException e) {
logger.warn("Exception when calling WorldService and/or TeamService\n" + e);
}
내가 지금 이상한 문제가 TeamCommand에 전화를하고 WorldCommand이 같이 완료되지 않습니다 코드가 실행되지 않습니다 :
Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() {
@Override
public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) {
String messageFromWorldService = worldReply.getMessage();
또한 폴백이 있으면 hystrix-timer 스레드에 MDC가 더 이상 없습니다.
사용자 정의 ConcurrentStrategy 클래스를 작성하고 로그를 추가하여 호출 중이라는 것을 확인했지만 로그를 인쇄하지 않습니다. 코멘트에서 정중하게 형식을 지정하기가 어렵 기 때문에 나는이 질문을 편집했다. 어떤 아이디어? 또한, HystrixConcurrenyStrategy의 Javadoc은 HystrixCommand를 사용하고 HystrixObservableCommand는 사용하지 않는다고 언급합니다. '''예를 들어, HystrixCommand에 의해 실행되는 모든 Callable은 wrapCallable (Callable)을 호출하여 사용자 정의 구현이 추가 동작으로 Callable을 장식 할 기회를 제공합니다. ''' 사용자 정의 ConcurrentStrategy가 둘 모두에 대해 작동합니까? – user2237511
[스레드 격리 전략] (https://github.com/Netflix/Hystrix/wiki/How-it-Works#threads-thread-pools)을 사용한다고 가정하면 [동시성 전략] (https : // github .com/Netflix/Hystrix/wiki/Plugins # concurrencystrategy)가 적용되어야합니다 ... 세마포어 격리에 대해서는 확실하지 않지만 어쨌든 그것을 선택하지 않는 것처럼 보입니다. 방금 테스트 한 결과 로컬에서 Observable 명령이 플러그인을 트리거합니다. 내가 가진 문제는 명령을 호출하는 방법입니다. 'teamCommand.construct()'대신'teamCommand.execute()'또는'teamCommand.queue()'를 사용하십시오. – Mike
[동기 실행] (https://github.com/Netflix/Hystrix/wiki/How-To-Use#synchronous-execution) 및 [비동기 실행] (https://github.com/Netflix/Hystrix/)에 대한 지침 wiki/How-To-Use # asynchronous-execution)은 [How To Use] (https://github.com/Netflix/Hystrix/wiki/How-To-Use) 페이지에서 찾을 수 있습니다. – Mike