etcd是一个开源的、分布式的键值对数据存储系统,提供共享配置、服务的注册和发现。etcd与zookeeper相比算是轻量级系统,两者的一致性协议也一样,etcd的raft比zookeeper的paxos简单。关于windows版本的etcd服务端和nodejs浏览器下载和安装见etcd服务端和客户端安装。
我们用etcd,就需要etcd客户端,这里用的是java客户端etcd4j。etcd客户端通过http发送get、put、post、delete等操作到服务端执行对目录信息的增删查改。etcd应用于微服务架构中的角色是服务注册中心,通过对接口调用信息的添加和查询,提供服务的注册和发现能力。
下面结合etcd4j的主要功能类EtcdClient,我们看看etcd的一些操作:
* Copyright (c) 2015, Jurriaan Mous and contributors as indicated by the @author tags.
package mousio.etcd4j;import io.netty.handler.ssl.SslContext;
import mousio.client.retry.RetryPolicy;
import mousio.client.retry.RetryWithExponentialBackOff;
import mousio.etcd4j.requests.*;
import mousio.etcd4j.responses.EtcdAuthenticationException;
import mousio.etcd4j.responses.EtcdException;
import mousio.etcd4j.responses.EtcdHealthResponse;
import mousio.etcd4j.responses.EtcdMembersResponse;
import mousio.etcd4j.responses.EtcdSelfStatsResponse;
import mousio.etcd4j.responses.EtcdStoreStatsResponse;
import mousio.etcd4j.responses.EtcdVersionResponse;
import mousio.etcd4j.responses.EtcdLeaderStatsResponse;
import mousio.etcd4j.transport.EtcdClientImpl;
import mousio.etcd4j.transport.EtcdNettyClient;import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.TimeoutException;/*** Etcd client.*/
public class EtcdClient implements Closeable {private final EtcdClientImpl client;private RetryPolicy retryHandler;/*** Constructor** @param baseUri URI to create connection on*/public EtcdClient(URI... baseUri) {this(EtcdSecurityContext.NONE, baseUri);}/*** Constructor** @param username username* @param password password* @param baseUri URI to create connection on*/public EtcdClient(String username, String password, URI... baseUri) {this(EtcdSecurityContext.withCredential(username, password), baseUri);}/*** Constructor** @param sslContext context for Ssl connections* @param username username* @param password password* @param baseUri URI to create connection on*/public EtcdClient(SslContext sslContext, String username, String password, URI... baseUri) {this(new EtcdSecurityContext(sslContext, username, password), baseUri);}/*** Constructor** @param sslContext context for Ssl connections* @param baseUri URI to create connection on*/public EtcdClient(SslContext sslContext, URI... baseUri) {this(EtcdSecurityContext.withSslContext(sslContext), baseUri);}/*** Constructor** @param securityContext context for security* @param baseUri URI to create connection on*/public EtcdClient(EtcdSecurityContext securityContext, URI... baseUri) {this(new EtcdNettyClient(securityContext,(baseUri.length == 0)? new URI[] { URI.create("https://127.0.0.1:4001") }: baseUri));}/*** Create a client with a custom implementation** @param etcdClientImpl to create client with.*/public EtcdClient(EtcdClientImpl etcdClientImpl) {this.client = etcdClientImpl;this.retryHandler = RetryWithExponentialBackOff.DEFAULT;}/*** Get the version of the Etcd server** @return version as String* @deprecated use version() when using etcd 2.1+.*/@Deprecatedpublic String getVersion() {try {return new EtcdOldVersionRequest(this.client, retryHandler).send().get();} catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {return null;}}/*** Get the version of the Etcd server** @return version*/public EtcdVersionResponse version() {try {return new EtcdVersionRequest(this.client, retryHandler).send().get();} catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {return null;}}/*** Get the Self Statistics of Etcd** @return EtcdSelfStatsResponse*/public EtcdSelfStatsResponse getSelfStats() {try {return new EtcdSelfStatsRequest(this.client, retryHandler).send().get();} catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {return null;}}/*** Get the Leader Statistics of Etcd** @return EtcdLeaderStatsResponse*/public EtcdLeaderStatsResponse getLeaderStats() {try {return new EtcdLeaderStatsRequest(this.client, retryHandler).send().get();} catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {return null;}}/*** Get the Store Statistics of Etcd** @return vEtcdStoreStatsResponse*/public EtcdStoreStatsResponse getStoreStats() {try {return new EtcdStoreStatsRequest(this.client, retryHandler).send().get();} catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {return null;}}/*** Get the Members of Etcd** @return vEtcdMembersResponse*/public EtcdMembersResponse getMembers() {try {return new EtcdMembersRequest(this.client,retryHandler).send().get();} catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {return null;}}/*** Get the Members of Etcd** @return vEtcdMembersResponse*/public EtcdHealthResponse getHealth() {try {return new EtcdHealthRequest(this.client,retryHandler).send().get();} catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {return null;}}/*** Put a key with a value** @param key to put* @param value to put on key* @return EtcdKeysRequest*/public EtcdKeyPutRequest put(String key, String value) {return new EtcdKeyPutRequest(client, key, retryHandler).value(value);}/*** Refresh a key with new ttl* (without notifying watchers when using etcd 2.3+)** @param key to refresh* @param ttl to update key with* @return EtcdKeysRequest*/public EtcdKeyPutRequest refresh(String key, Integer ttl) {return new EtcdKeyPutRequest(client, key, retryHandler).refresh(ttl);}/*** Create a dir** @param dir to create* @return EtcdKeysRequest*/public EtcdKeyPutRequest putDir(String dir) {return new EtcdKeyPutRequest(client, dir, retryHandler).isDir();}/*** Post a value to a key for in-order keys.** @param key to post to* @param value to post* @return EtcdKeysRequest*/public EtcdKeyPostRequest post(String key, String value) {return new EtcdKeyPostRequest(client, key, retryHandler).value(value);}/*** Deletes a key** @param key to delete* @return EtcdKeysRequest*/public EtcdKeyDeleteRequest delete(String key) {return new EtcdKeyDeleteRequest(client, key, retryHandler);}/*** Deletes a directory** @param dir to delete* @return EtcdKeysRequest*/public EtcdKeyDeleteRequest deleteDir(String dir) {return new EtcdKeyDeleteRequest(client, dir, retryHandler).dir();}/*** Get by key** @param key to get* @return EtcdKeysRequest*/public EtcdKeyGetRequest get(String key) {return new EtcdKeyGetRequest(client, key, retryHandler);}/*** Get directory** @param dir to get* @return EtcdKeysGetRequest*/public EtcdKeyGetRequest getDir(String dir) {return new EtcdKeyGetRequest(client, dir, retryHandler).dir();}/*** Get all keys** @return EtcdKeysRequest*/public EtcdKeyGetRequest getAll() {return new EtcdKeyGetRequest(client, retryHandler);}@Overridepublic void close() throws IOException {if (client != null) {client.close();}}/*** Set the retry handler. Default is an exponential back-off with start of 20ms.** @param retryHandler to set* @return this instance*/public EtcdClient setRetryHandler(RetryPolicy retryHandler) {this.retryHandler = retryHandler;return this;}
}
这个类提供能etcd连接的方法,也就是构造器方法,提供了对etcd的操作方法(get、put、post、putDir、delete、deleteDir等),还提供了重试策略(参见RetryPolicy)。我们跟一下EtcdClient(URI... baseUri),进入EtcdClient(EtcdSecurityContext securityContext, URI... baseUri),实例化EtcdNettyClient:
* Copyright (c) 2015, Jurriaan Mous and contributors as indicated by the @author tags.
package mousio.etcd4j.transport;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsServerAddresses;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import mousio.client.ConnectionState;
import mousio.client.retry.RetryHandler;
import mousio.etcd4j.EtcdSecurityContext;
import mousio.etcd4j.promises.EtcdResponsePromise;
import mousio.etcd4j.requests.EtcdRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.CancellationException;/*** @author Jurriaan Mous* @author Luca Burgazzoli** Netty client for the requests and responses*/
public class EtcdNettyClient implements EtcdClientImpl {private static final Logger logger = LoggerFactory.getLogger(EtcdNettyClient.class);// default etcd portprivate static final int DEFAULT_PORT = 2379;private static final String ENV_ETCD4J_ENDPOINT = "ETCD4J_ENDPOINT";private final EventLoopGroup eventLoopGroup;private final URI[] uris;private final Bootstrap bootstrap;//private final String hostName;private final EtcdNettyConfig config;private final EtcdSecurityContext securityContext;protected volatile int lastWorkingUriIndex;/*** Constructor** @param sslContext SSL context if connecting with SSL. Null if not connecting with SSL.* @param uri to connect to*/public EtcdNettyClient(final SslContext sslContext, final URI... uri) {this(new EtcdNettyConfig(), sslContext, uri);}/*** Constructor** @param securityContext security context.* @param uri to connect to*/public EtcdNettyClient(final EtcdSecurityContext securityContext, final URI... uri) {this(new EtcdNettyConfig(), securityContext, uri);}/*** Constructor with custom eventloop group and timeout** @param config for netty* @param sslContext SSL context if connecting with SSL. Null if not connecting with SSL.* @param uris to connect to*/public EtcdNettyClient(final EtcdNettyConfig config,final SslContext sslContext, final URI... uris) {this(config, new EtcdSecurityContext(sslContext), uris);}/*** Constructor with custom eventloop group and timeout** @param config for netty* @param uris to connect to*/public EtcdNettyClient(final EtcdNettyConfig config, final URI... uris) {this(config, EtcdSecurityContext.NONE, uris);}/*** Constructor with custom eventloop group and timeout** @param config for netty* @param securityContext security context (ssl, authentication)* @param uris to connect to*/public EtcdNettyClient(final EtcdNettyConfig config,final EtcdSecurityContext securityContext, final URI... uris) {logger.info("Setting up Etcd4j Netty client");this.lastWorkingUriIndex = 0;this.config = config.clone();this.securityContext = securityContext.clone();this.uris = uris;this.eventLoopGroup = config.getEventLoopGroup();this.bootstrap = new Bootstrap().group(eventLoopGroup).channel(config.getSocketChannelClass()).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout()).resolver(new DnsAddressResolverGroup(NioDatagramChannel.class,DnsServerAddresses.defaultAddresses())).handler(new ChannelInitializer
}
进入EtcdNettyClient(final EtcdSecurityContext securityContext, final URI... uri),再进入public EtcdNettyClient(final EtcdNettyConfig config,
final EtcdSecurityContext securityContext, final URI... uris),我们发现etcd集成了netty框架。