Browse Source

Refactor into a request-style API (#87)

* Refactor the raw API to be command-oriented

Signed-off-by: Nick Cameron <nrc@ncameron.org>

* WIP: provide mocks and use them for tests

Signed-off-by: Nick Cameron <nrc@ncameron.org>
Nick Cameron 1 month ago
parent
commit
8165adf1f9

+ 2 - 3
examples/raw.rs

@@ -6,7 +6,7 @@
 mod common;
 
 use crate::common::parse_args;
-use tikv_client::{raw::Client, Config, Key, KvPair, Result, ToOwnedRange, Value};
+use tikv_client::{Config, Key, KvPair, RawClient as Client, Result, ToOwnedRange, Value};
 
 const KEY: &str = "TiKV";
 const VALUE: &str = "Rust";
@@ -27,8 +27,7 @@ async fn main() -> Result<()> {
 
     // When we first create a client we receive a `Connect` structure which must be resolved before
     // the client is actually connected and usable.
-    let unconnnected_client = Client::connect(config);
-    let client = unconnnected_client.await?;
+    let client = Client::new(config)?;
 
     // Requests are created from the connected client. These calls return structures which
     // implement `Future`. This means the `Future` must be resolved before the action ever takes

+ 1 - 1
examples/transaction.rs

@@ -7,7 +7,7 @@ mod common;
 use crate::common::parse_args;
 use futures::prelude::*;
 use std::ops::RangeBounds;
-use tikv_client::{transaction::Client, Config, Key, KvPair, Value};
+use tikv_client::{Config, Key, KvPair, TransactionClient as Client, Value};
 
 async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
     let mut txn = client.begin().await.expect("Could not begin a transaction");

+ 12 - 0
src/kv/bound_range.rs

@@ -1,6 +1,7 @@
 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
 
 use super::Key;
+use kvproto::kvrpcpb;
 #[cfg(test)]
 use proptest_derive::Arbitrary;
 use std::borrow::Borrow;
@@ -182,6 +183,17 @@ impl<T: Into<Key> + Eq> TryFrom<(Bound<T>, Bound<T>)> for BoundRange {
     }
 }
 
+impl Into<kvrpcpb::KeyRange> for BoundRange {
+    fn into(self) -> kvrpcpb::KeyRange {
+        let (start, end) = self.into_keys();
+        let mut range = kvrpcpb::KeyRange::default();
+        range.set_start_key(start.into());
+        // FIXME handle end = None rather than unwrapping
+        end.map(|k| range.set_end_key(k.into())).unwrap();
+        range
+    }
+}
+
 /// A convenience trait for converting ranges of borrowed types into a `BoundRange`.
 pub trait ToOwnedRange {
     /// Transform a borrowed range of some form into an owned `BoundRange`.

+ 6 - 0
src/kv/key.rs

@@ -111,6 +111,12 @@ impl<'a> Into<&'a [u8]> for &'a Key {
     }
 }
 
+impl AsRef<Key> for Key {
+    fn as_ref(&self) -> &Key {
+        self
+    }
+}
+
 impl fmt::Debug for Key {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         write!(f, "Key({})", HexRepr(&self.0))

+ 30 - 1
src/kv/kvpair.rs

@@ -1,6 +1,7 @@
 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
 
 use super::{HexRepr, Key, Value};
+use kvproto::kvrpcpb;
 #[cfg(test)]
 use proptest_derive::Arbitrary;
 use std::{fmt, str};
@@ -20,7 +21,7 @@ use std::{fmt, str};
 /// types (Like a `(Key, Value)`) can be passed directly to those functions.
 #[derive(Default, Clone, Eq, PartialEq)]
 #[cfg_attr(test, derive(Arbitrary))]
-pub struct KvPair(Key, Value);
+pub struct KvPair(pub Key, pub Value);
 
 impl KvPair {
     /// Create a new `KvPair`.
@@ -92,6 +93,34 @@ impl Into<(Key, Value)> for KvPair {
     }
 }
 
+impl From<kvrpcpb::KvPair> for KvPair {
+    fn from(mut pair: kvrpcpb::KvPair) -> Self {
+        KvPair(Key::from(pair.take_key()), Value::from(pair.take_value()))
+    }
+}
+
+impl Into<kvrpcpb::KvPair> for KvPair {
+    fn into(self) -> kvrpcpb::KvPair {
+        let mut result = kvrpcpb::KvPair::default();
+        let (key, value) = self.into();
+        result.set_key(key.into());
+        result.set_value(value.into());
+        result
+    }
+}
+
+impl AsRef<Key> for KvPair {
+    fn as_ref(&self) -> &Key {
+        &self.0
+    }
+}
+
+impl AsRef<Value> for KvPair {
+    fn as_ref(&self) -> &Value {
+        &self.1
+    }
+}
+
 impl fmt::Debug for KvPair {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         let KvPair(key, value) = self;

+ 340 - 0
src/kv_client/client.rs

@@ -0,0 +1,340 @@
+// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
+
+// FIXME: Remove this when txn is done.
+#![allow(dead_code)]
+
+use derive_new::new;
+use futures::compat::Compat01As03;
+use futures::future::BoxFuture;
+use futures::prelude::*;
+use grpcio::CallOption;
+use kvproto::kvrpcpb;
+use kvproto::tikvpb::TikvClient;
+use std::{sync::Arc, time::Duration};
+
+use crate::{
+    kv_client::HasError,
+    pd::Region,
+    raw::RawRequest,
+    stats::tikv_stats,
+    transaction::{Mutation, TxnInfo},
+    ErrorKind, Key, Result,
+};
+
+/// This client handles requests for a single TiKV node. It converts the data
+/// types and abstractions of the client program into the grpc data types.
+#[derive(new, Clone)]
+pub struct KvRpcClient {
+    rpc_client: Arc<TikvClient>,
+}
+
+impl super::KvClient for KvRpcClient {
+    fn dispatch<T: RawRequest>(
+        &self,
+        request: &T::RpcRequest,
+        opt: CallOption,
+    ) -> BoxFuture<'static, Result<T::RpcResponse>> {
+        map_errors_and_trace(T::REQUEST_NAME, T::RPC_FN(&self.rpc_client, request, opt)).boxed()
+    }
+}
+
+pub struct TransactionRegionClient {
+    region: Region,
+    timeout: Duration,
+    client: Arc<TikvClient>,
+}
+
+// FIXME use `request` method instead.
+macro_rules! txn_request {
+    ($region:expr, $type:ty) => {{
+        let mut req = <$type>::default();
+        // FIXME don't unwrap
+        req.set_context($region.context().unwrap());
+        req
+    }};
+}
+
+impl From<Mutation> for kvrpcpb::Mutation {
+    fn from(mutation: Mutation) -> kvrpcpb::Mutation {
+        let mut pb = kvrpcpb::Mutation::default();
+        match mutation {
+            Mutation::Put(k, v) => {
+                pb.set_op(kvrpcpb::Op::Put);
+                pb.set_key(k.into());
+                pb.set_value(v.into());
+            }
+            Mutation::Del(k) => {
+                pb.set_op(kvrpcpb::Op::Del);
+                pb.set_key(k.into());
+            }
+            Mutation::Lock(k) => {
+                pb.set_op(kvrpcpb::Op::Lock);
+                pb.set_key(k.into());
+            }
+            Mutation::Rollback(k) => {
+                pb.set_op(kvrpcpb::Op::Rollback);
+                pb.set_key(k.into());
+            }
+        };
+        pb
+    }
+}
+
+impl From<TxnInfo> for kvrpcpb::TxnInfo {
+    fn from(txn_info: TxnInfo) -> kvrpcpb::TxnInfo {
+        let mut pb = kvrpcpb::TxnInfo::default();
+        pb.set_txn(txn_info.txn);
+        pb.set_status(txn_info.status);
+        pb
+    }
+}
+
+impl TransactionRegionClient {
+    pub fn kv_get(
+        &self,
+        version: u64,
+        key: Key,
+    ) -> impl Future<Output = Result<kvrpcpb::GetResponse>> {
+        let mut req = txn_request!(self.region, kvrpcpb::GetRequest);
+        req.set_key(key.into());
+        req.set_version(version);
+
+        map_errors_and_trace(
+            "kv_get",
+            self.client
+                .clone()
+                .kv_get_async_opt(&req, self.call_options()),
+        )
+    }
+
+    pub fn kv_scan(
+        &self,
+        version: u64,
+        start_key: Key,
+        end_key: Key,
+        limit: u32,
+        key_only: bool,
+    ) -> impl Future<Output = Result<kvrpcpb::ScanResponse>> {
+        let mut req = txn_request!(self.region, kvrpcpb::ScanRequest);
+        req.set_start_key(start_key.into());
+        req.set_end_key(end_key.into());
+        req.set_version(version);
+        req.set_limit(limit);
+        req.set_key_only(key_only);
+
+        map_errors_and_trace(
+            "kv_scan",
+            self.client
+                .clone()
+                .kv_scan_async_opt(&req, self.call_options()),
+        )
+    }
+
+    pub fn kv_prewrite(
+        &self,
+        mutations: impl Iterator<Item = Mutation>,
+        primary_lock: Key,
+        start_version: u64,
+        lock_ttl: u64,
+        skip_constraint_check: bool,
+    ) -> impl Future<Output = Result<kvrpcpb::PrewriteResponse>> {
+        let mut req = txn_request!(self.region, kvrpcpb::PrewriteRequest);
+        req.set_mutations(mutations.map(Into::into).collect());
+        req.set_primary_lock(primary_lock.into());
+        req.set_start_version(start_version);
+        req.set_lock_ttl(lock_ttl);
+        req.set_skip_constraint_check(skip_constraint_check);
+
+        map_errors_and_trace(
+            "kv_prewrite",
+            self.client
+                .clone()
+                .kv_prewrite_async_opt(&req, self.call_options()),
+        )
+    }
+
+    pub fn kv_commit(
+        &self,
+        keys: impl Iterator<Item = Key>,
+        start_version: u64,
+        commit_version: u64,
+    ) -> impl Future<Output = Result<kvrpcpb::CommitResponse>> {
+        let mut req = txn_request!(self.region, kvrpcpb::CommitRequest);
+        req.set_keys(keys.map(|x| x.into()).collect());
+        req.set_start_version(start_version);
+        req.set_commit_version(commit_version);
+
+        map_errors_and_trace(
+            "kv_commit",
+            self.client
+                .clone()
+                .kv_commit_async_opt(&req, self.call_options()),
+        )
+    }
+
+    pub fn kv_import(
+        &self,
+        mutations: impl Iterator<Item = Mutation>,
+        commit_version: u64,
+    ) -> impl Future<Output = Result<kvrpcpb::ImportResponse>> {
+        let mut req = kvrpcpb::ImportRequest::default();
+        req.set_mutations(mutations.map(Into::into).collect());
+        req.set_commit_version(commit_version);
+
+        map_errors_and_trace(
+            "kv_import",
+            self.client
+                .clone()
+                .kv_import_async_opt(&req, self.call_options()),
+        )
+    }
+
+    pub fn kv_cleanup(
+        &self,
+        key: Key,
+        start_version: u64,
+    ) -> impl Future<Output = Result<kvrpcpb::CleanupResponse>> {
+        let mut req = txn_request!(self.region, kvrpcpb::CleanupRequest);
+        req.set_key(key.into());
+        req.set_start_version(start_version);
+
+        map_errors_and_trace(
+            "kv_cleanup",
+            self.client
+                .clone()
+                .kv_cleanup_async_opt(&req, self.call_options()),
+        )
+    }
+
+    pub fn kv_batch_get(
+        &self,
+        keys: impl Iterator<Item = Key>,
+        version: u64,
+    ) -> impl Future<Output = Result<kvrpcpb::BatchGetResponse>> {
+        let mut req = txn_request!(self.region, kvrpcpb::BatchGetRequest);
+        req.set_keys(keys.map(|x| x.into()).collect());
+        req.set_version(version);
+
+        map_errors_and_trace(
+            "kv_batch_get",
+            self.client
+                .clone()
+                .kv_batch_get_async_opt(&req, self.call_options()),
+        )
+    }
+
+    pub fn kv_batch_rollback(
+        &self,
+        keys: impl Iterator<Item = Key>,
+        start_version: u64,
+    ) -> impl Future<Output = Result<kvrpcpb::BatchRollbackResponse>> {
+        let mut req = txn_request!(self.region, kvrpcpb::BatchRollbackRequest);
+        req.set_keys(keys.map(|x| x.into()).collect());
+        req.set_start_version(start_version);
+
+        map_errors_and_trace(
+            "kv_batch_rollback",
+            self.client
+                .clone()
+                .kv_batch_rollback_async_opt(&req, self.call_options()),
+        )
+    }
+
+    pub fn kv_scan_lock(
+        &self,
+        start_key: Key,
+        max_version: u64,
+        limit: u32,
+    ) -> impl Future<Output = Result<kvrpcpb::ScanLockResponse>> {
+        let mut req = txn_request!(self.region, kvrpcpb::ScanLockRequest);
+        req.set_start_key(start_key.into());
+        req.set_max_version(max_version);
+        req.set_limit(limit);
+
+        map_errors_and_trace(
+            "kv_scan_lock",
+            self.client
+                .clone()
+                .kv_scan_lock_async_opt(&req, self.call_options()),
+        )
+    }
+
+    pub fn kv_resolve_lock(
+        &self,
+        txn_infos: impl Iterator<Item = TxnInfo>,
+        start_version: u64,
+        commit_version: u64,
+    ) -> impl Future<Output = Result<kvrpcpb::ResolveLockResponse>> {
+        let mut req = txn_request!(self.region, kvrpcpb::ResolveLockRequest);
+        req.set_start_version(start_version);
+        req.set_commit_version(commit_version);
+        req.set_txn_infos(txn_infos.map(Into::into).collect());
+
+        map_errors_and_trace(
+            "kv_resolve_lock",
+            self.client
+                .clone()
+                .kv_resolve_lock_async_opt(&req, self.call_options()),
+        )
+    }
+
+    pub fn kv_gc(&self, safe_point: u64) -> impl Future<Output = Result<kvrpcpb::GcResponse>> {
+        let mut req = txn_request!(self.region, kvrpcpb::GcRequest);
+        req.set_safe_point(safe_point);
+
+        map_errors_and_trace(
+            "kv_gc",
+            self.client
+                .clone()
+                .kv_gc_async_opt(&req, self.call_options()),
+        )
+    }
+
+    pub fn kv_delete_range(
+        &self,
+        start_key: Key,
+        end_key: Key,
+    ) -> impl Future<Output = Result<kvrpcpb::DeleteRangeResponse>> {
+        let mut req = txn_request!(self.region, kvrpcpb::DeleteRangeRequest);
+        req.set_start_key(start_key.into());
+        req.set_end_key(end_key.into());
+
+        map_errors_and_trace(
+            "kv_delete_range",
+            self.client
+                .clone()
+                .kv_delete_range_async_opt(&req, self.call_options()),
+        )
+    }
+
+    fn call_options(&self) -> CallOption {
+        CallOption::default().timeout(self.timeout)
+    }
+}
+
+fn map_errors_and_trace<Resp, RpcFuture>(
+    request_name: &'static str,
+    fut: ::grpcio::Result<RpcFuture>,
+) -> impl Future<Output = Result<Resp>>
+where
+    Compat01As03<RpcFuture>: Future<Output = std::result::Result<Resp, ::grpcio::Error>>,
+    Resp: HasError + Sized + Clone + Send + 'static,
+{
+    let context = tikv_stats(request_name);
+
+    // FIXME should handle the error, not unwrap.
+    Compat01As03::new(fut.unwrap())
+        .map(|r| match r {
+            Err(e) => Err(ErrorKind::Grpc(e).into()),
+            Ok(mut r) => {
+                if let Some(e) = r.region_error() {
+                    Err(e)
+                } else if let Some(e) = r.error() {
+                    Err(e)
+                } else {
+                    Ok(r)
+                }
+            }
+        })
+        .map(move |r| context.done(r))
+}

+ 142 - 0
src/kv_client/errors.rs

@@ -0,0 +1,142 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use crate::Error;
+use kvproto::{errorpb, kvrpcpb};
+
+pub trait HasRegionError {
+    fn region_error(&mut self) -> Option<Error>;
+}
+
+pub trait HasError: HasRegionError {
+    fn error(&mut self) -> Option<Error>;
+}
+
+impl From<errorpb::Error> for Error {
+    fn from(mut e: errorpb::Error) -> Error {
+        let message = e.take_message();
+        if e.has_not_leader() {
+            let e = e.get_not_leader();
+            let message = format!("{}. Leader: {:?}", message, e.get_leader());
+            Error::not_leader(e.get_region_id(), Some(message))
+        } else if e.has_region_not_found() {
+            Error::region_not_found(e.get_region_not_found().get_region_id(), Some(message))
+        } else if e.has_key_not_in_region() {
+            let e = e.take_key_not_in_region();
+            Error::key_not_in_region(e)
+        } else if e.has_epoch_not_match() {
+            Error::stale_epoch(Some(format!(
+                "{}. New epoch: {:?}",
+                message,
+                e.get_epoch_not_match().get_current_regions()
+            )))
+        } else if e.has_server_is_busy() {
+            Error::server_is_busy(e.take_server_is_busy())
+        } else if e.has_stale_command() {
+            Error::stale_command(message)
+        } else if e.has_store_not_match() {
+            Error::store_not_match(e.take_store_not_match(), message)
+        } else if e.has_raft_entry_too_large() {
+            Error::raft_entry_too_large(e.take_raft_entry_too_large(), message)
+        } else {
+            Error::internal_error(message)
+        }
+    }
+}
+
+macro_rules! has_region_error {
+    ($type:ty) => {
+        impl HasRegionError for $type {
+            fn region_error(&mut self) -> Option<Error> {
+                if self.has_region_error() {
+                    Some(self.take_region_error().into())
+                } else {
+                    None
+                }
+            }
+        }
+    };
+}
+
+has_region_error!(kvrpcpb::GetResponse);
+has_region_error!(kvrpcpb::ScanResponse);
+has_region_error!(kvrpcpb::PrewriteResponse);
+has_region_error!(kvrpcpb::CommitResponse);
+has_region_error!(kvrpcpb::ImportResponse);
+has_region_error!(kvrpcpb::BatchRollbackResponse);
+has_region_error!(kvrpcpb::CleanupResponse);
+has_region_error!(kvrpcpb::BatchGetResponse);
+has_region_error!(kvrpcpb::ScanLockResponse);
+has_region_error!(kvrpcpb::ResolveLockResponse);
+has_region_error!(kvrpcpb::GcResponse);
+has_region_error!(kvrpcpb::RawGetResponse);
+has_region_error!(kvrpcpb::RawBatchGetResponse);
+has_region_error!(kvrpcpb::RawPutResponse);
+has_region_error!(kvrpcpb::RawBatchPutResponse);
+has_region_error!(kvrpcpb::RawDeleteResponse);
+has_region_error!(kvrpcpb::RawBatchDeleteResponse);
+has_region_error!(kvrpcpb::DeleteRangeResponse);
+has_region_error!(kvrpcpb::RawDeleteRangeResponse);
+has_region_error!(kvrpcpb::RawScanResponse);
+has_region_error!(kvrpcpb::RawBatchScanResponse);
+
+macro_rules! has_key_error {
+    ($type:ty) => {
+        impl HasError for $type {
+            fn error(&mut self) -> Option<Error> {
+                if self.has_error() {
+                    Some(self.take_error().into())
+                } else {
+                    None
+                }
+            }
+        }
+    };
+}
+
+has_key_error!(kvrpcpb::GetResponse);
+has_key_error!(kvrpcpb::CommitResponse);
+has_key_error!(kvrpcpb::BatchRollbackResponse);
+has_key_error!(kvrpcpb::CleanupResponse);
+has_key_error!(kvrpcpb::ScanLockResponse);
+has_key_error!(kvrpcpb::ResolveLockResponse);
+has_key_error!(kvrpcpb::GcResponse);
+
+macro_rules! has_str_error {
+    ($type:ty) => {
+        impl HasError for $type {
+            fn error(&mut self) -> Option<Error> {
+                if self.get_error().is_empty() {
+                    None
+                } else {
+                    Some(Error::kv_error(self.take_error()))
+                }
+            }
+        }
+    };
+}
+
+has_str_error!(kvrpcpb::RawGetResponse);
+has_str_error!(kvrpcpb::RawPutResponse);
+has_str_error!(kvrpcpb::RawBatchPutResponse);
+has_str_error!(kvrpcpb::RawDeleteResponse);
+has_str_error!(kvrpcpb::RawBatchDeleteResponse);
+has_str_error!(kvrpcpb::RawDeleteRangeResponse);
+has_str_error!(kvrpcpb::ImportResponse);
+has_str_error!(kvrpcpb::DeleteRangeResponse);
+
+macro_rules! has_no_error {
+    ($type:ty) => {
+        impl HasError for $type {
+            fn error(&mut self) -> Option<Error> {
+                None
+            }
+        }
+    };
+}
+
+has_no_error!(kvrpcpb::ScanResponse);
+has_no_error!(kvrpcpb::PrewriteResponse);
+has_no_error!(kvrpcpb::BatchGetResponse);
+has_no_error!(kvrpcpb::RawBatchGetResponse);
+has_no_error!(kvrpcpb::RawScanResponse);
+has_no_error!(kvrpcpb::RawBatchScanResponse);

+ 125 - 0
src/kv_client/mod.rs

@@ -0,0 +1,125 @@
+// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
+
+mod client;
+mod errors;
+
+pub use self::client::KvRpcClient;
+pub use self::errors::HasError;
+pub use kvproto::tikvpb::TikvClient;
+
+use crate::pd::Region;
+use crate::raw::{ColumnFamily, RawRequest};
+use crate::security::SecurityManager;
+use crate::Result;
+use derive_new::new;
+use futures::future::BoxFuture;
+use grpcio::CallOption;
+use grpcio::Environment;
+use kvproto::kvrpcpb;
+use std::sync::Arc;
+use std::time::Duration;
+
+/// A trait for connecting to TiKV stores.
+pub trait KvConnect: Sized {
+    type KvClient: KvClient + Clone + Send + Sync + 'static;
+
+    fn connect(&self, address: &str) -> Result<Self::KvClient>;
+}
+
+pub type RpcFnType<Req, Resp> =
+    for<'a, 'b> fn(
+        &'a TikvClient,
+        &'b Req,
+        CallOption,
+    )
+        -> std::result::Result<::grpcio::ClientUnaryReceiver<Resp>, ::grpcio::Error>;
+
+#[derive(new, Clone)]
+pub struct TikvConnect {
+    env: Arc<Environment>,
+    security_mgr: Arc<SecurityManager>,
+}
+
+impl KvConnect for TikvConnect {
+    type KvClient = KvRpcClient;
+
+    fn connect(&self, address: &str) -> Result<KvRpcClient> {
+        self.security_mgr
+            .connect(self.env.clone(), address, TikvClient::new)
+            .map(|c| KvRpcClient::new(Arc::new(c)))
+    }
+}
+
+pub trait KvClient {
+    fn dispatch<T: RawRequest>(
+        &self,
+        request: &T::RpcRequest,
+        opt: CallOption,
+    ) -> BoxFuture<'static, Result<T::RpcResponse>>;
+}
+
+#[derive(new)]
+pub struct Store<Client: KvClient> {
+    pub region: Region,
+    client: Client,
+    timeout: Duration,
+}
+
+impl<Client: KvClient> Store<Client> {
+    pub fn call_options(&self) -> CallOption {
+        CallOption::default().timeout(self.timeout)
+    }
+
+    pub fn request<T: KvRawRequest>(&self) -> T {
+        let mut request = T::default();
+        // FIXME propagate the error instead of using `expect`
+        request.set_context(
+            self.region
+                .context()
+                .expect("Cannot create context from region"),
+        );
+        request
+    }
+
+    pub fn dispatch<T: RawRequest>(
+        &self,
+        request: &T::RpcRequest,
+        opt: CallOption,
+    ) -> BoxFuture<'static, Result<T::RpcResponse>> {
+        self.client.dispatch::<T>(request, opt)
+    }
+}
+
+pub trait KvRawRequest: Default {
+    fn set_cf(&mut self, cf: String);
+    fn set_context(&mut self, context: kvrpcpb::Context);
+
+    fn maybe_set_cf(&mut self, cf: Option<ColumnFamily>) {
+        if let Some(cf) = cf {
+            self.set_cf(cf.to_string());
+        }
+    }
+}
+
+macro_rules! impl_raw_request {
+    ($name: ident) => {
+        impl KvRawRequest for kvrpcpb::$name {
+            fn set_cf(&mut self, cf: String) {
+                self.set_cf(cf);
+            }
+            fn set_context(&mut self, context: kvrpcpb::Context) {
+                self.set_context(context);
+            }
+        }
+    };
+}
+
+impl_raw_request!(RawGetRequest);
+impl_raw_request!(RawBatchGetRequest);
+impl_raw_request!(RawPutRequest);
+impl_raw_request!(RawBatchPutRequest);
+impl_raw_request!(RawDeleteRequest);
+impl_raw_request!(RawBatchDeleteRequest);
+impl_raw_request!(RawScanRequest);
+impl_raw_request!(RawBatchScanRequest);
+impl_raw_request!(RawDeleteRangeRequest);

+ 22 - 7
src/lib.rs

@@ -3,7 +3,9 @@
 // Long and nested future chains can quickly result in large generic types.
 #![type_length_limit = "16777216"]
 #![allow(clippy::redundant_closure)]
+#![allow(clippy::type_complexity)]
 #![feature(async_await)]
+#![cfg_attr(test, feature(specialization))]
 
 //! This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a
 //! distributed transactional Key-Value database written in Rust.
@@ -33,7 +35,7 @@
 //! operations affecting multiple keys or values, or operations that depend on strong ordering.
 //!
 //! ```rust
-//! use tikv_client::{*, transaction::*};
+//! use tikv_client::*;
 //! ```
 //!
 //! ### Raw
@@ -45,7 +47,7 @@
 //! key) requirements. You will not be able to use transactions with this API.
 //!
 //! ```rust
-//! use tikv_client::{*, raw::*};
+//! use tikv_client::*;
 //! ```
 //!
 //! ## Connect
@@ -55,7 +57,7 @@
 //!
 //! ```rust
 //! # #![feature(async_await)]
-//! # use tikv_client::{*, raw::*};
+//! # use tikv_client::*;
 //! # use futures::prelude::*;
 //!
 //! # futures::executor::block_on(async {
@@ -66,7 +68,7 @@
 //! ]).with_security("root.ca", "internal.cert", "internal.key");
 //!
 //! // Get an unresolved connection.
-//! let connect = Client::connect(config);
+//! let connect = TransactionClient::connect(config);
 //!
 //! // Resolve the connection into a client.
 //! let client = connect.into_future().await;
@@ -75,16 +77,23 @@
 //!
 //! At this point, you should seek the documentation in the related API modules.
 
+#[macro_use]
+mod util;
+
 mod compat;
 mod config;
 mod errors;
 mod kv;
-#[cfg(test)]
-mod proptests;
+mod kv_client;
+mod pd;
 pub mod raw;
-mod rpc;
+mod security;
+mod stats;
 pub mod transaction;
 
+#[cfg(test)]
+mod proptests;
+
 #[macro_use]
 extern crate lazy_static;
 #[macro_use]
@@ -102,3 +111,9 @@ pub use crate::errors::ErrorKind;
 pub use crate::errors::Result;
 #[doc(inline)]
 pub use crate::kv::{BoundRange, Key, KvPair, ToOwnedRange, Value};
+#[doc(inline)]
+pub use crate::raw::{Client as RawClient, ColumnFamily};
+#[doc(inline)]
+pub use crate::transaction::{
+    Client as TransactionClient, Connect, Snapshot, Timestamp, Transaction,
+};

+ 371 - 0
src/pd/client.rs

@@ -0,0 +1,371 @@
+// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::{
+    collections::HashMap,
+    sync::{Arc, RwLock},
+    time::Duration,
+};
+
+use futures::future::BoxFuture;
+use futures::future::{ready, Either};
+use futures::prelude::*;
+use futures::stream::BoxStream;
+use grpcio::EnvBuilder;
+
+use crate::{
+    compat::{stream_fn, ClientFutureExt},
+    kv::BoundRange,
+    kv_client::{KvClient, KvConnect, Store, TikvConnect},
+    pd::{Region, RegionId, RetryClient},
+    security::SecurityManager,
+    transaction::Timestamp,
+    Config, Key, Result,
+};
+
+const CQ_COUNT: usize = 1;
+const CLIENT_PREFIX: &str = "tikv-client";
+
+pub trait PdClient: Send + Sync + 'static {
+    type KvClient: KvClient + Send + Sync + 'static;
+
+    fn map_region_to_store(
+        self: Arc<Self>,
+        region: Region,
+    ) -> BoxFuture<'static, Result<Store<Self::KvClient>>>;
+
+    fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result<Region>>;
+
+    fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result<Region>>;
+
+    fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>>;
+
+    fn store_for_key(
+        self: Arc<Self>,
+        key: &Key,
+    ) -> BoxFuture<'static, Result<Store<Self::KvClient>>> {
+        self.region_for_key(key)
+            .and_then(move |region| self.clone().map_region_to_store(region))
+            .boxed()
+    }
+
+    fn store_for_id(
+        self: Arc<Self>,
+        id: RegionId,
+    ) -> BoxFuture<'static, Result<Store<Self::KvClient>>> {
+        self.region_for_id(id)
+            .and_then(move |region| self.clone().map_region_to_store(region).boxed())
+            .boxed()
+    }
+
+    fn group_keys_by_region<K: AsRef<Key> + Send + Sync + 'static>(
+        self: Arc<Self>,
+        keys: impl Iterator<Item = K> + Send + Sync + 'static,
+    ) -> BoxStream<'static, Result<(RegionId, Vec<K>)>> {
+        let keys = keys.peekable();
+        stream_fn(keys, move |mut keys| {
+            if let Some(key) = keys.next() {
+                Either::Left(self.region_for_key(key.as_ref()).map_ok(move |region| {
+                    let id = region.id();
+                    let mut grouped = vec![key];
+                    while let Some(key) = keys.peek() {
+                        if !region.contains(key.as_ref()) {
+                            break;
+                        }
+                        grouped.push(keys.next().unwrap());
+                    }
+                    Some((keys, (id, grouped)))
+                }))
+            } else {
+                Either::Right(ready(Ok(None)))
+            }
+        })
+        .boxed()
+    }
+
+    // Returns a Steam which iterates over the contexts for each region covered by range.
+    fn stores_for_range(
+        self: Arc<Self>,
+        range: BoundRange,
+    ) -> BoxStream<'static, Result<Store<Self::KvClient>>> {
+        let (start_key, end_key) = range.into_keys();
+        stream_fn(Some(start_key), move |start_key| {
+            let start_key = match start_key {
+                None => return Either::Right(ready(Ok(None))),
+                Some(sk) => sk,
+            };
+            let end_key = end_key.clone();
+
+            let this = self.clone();
+            Either::Left(self.region_for_key(&start_key).and_then(move |region| {
+                let region_end = region.end_key();
+                this.map_region_to_store(region).map_ok(move |store| {
+                    if end_key.map(|x| x < region_end).unwrap_or(false) || region_end.is_empty() {
+                        return Some((None, store));
+                    }
+                    Some((Some(region_end), store))
+                })
+            }))
+        })
+        .boxed()
+    }
+}
+
+/// This client converts requests for the logical TiKV cluster into requests
+/// for a single TiKV store using PD and internal logic.
+pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect> {
+    pd: Arc<RetryClient>,
+    kv_connect: KvC,
+    kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
+    timeout: Duration,
+}
+
+impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
+    type KvClient = KvC::KvClient;
+
+    fn map_region_to_store(
+        self: Arc<Self>,
+        region: Region,
+    ) -> BoxFuture<'static, Result<Store<KvC::KvClient>>> {
+        let timeout = self.timeout;
+        // FIXME propagate this error instead of using `unwrap`.
+        let store_id = region.get_store_id().unwrap();
+        self.pd
+            .clone()
+            .get_store(store_id)
+            .ok_and_then(move |store| self.kv_client(store.get_address()))
+            .map_ok(move |kv_client| Store::new(region, kv_client, timeout))
+            .boxed()
+    }
+
+    fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result<Region>> {
+        self.pd.clone().get_region_by_id(id).boxed()
+    }
+
+    fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result<Region>> {
+        self.pd.clone().get_region(key.into()).boxed()
+    }
+
+    fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
+        self.pd.clone().get_timestamp()
+    }
+}
+
+impl PdRpcClient<TikvConnect> {
+    pub fn connect(config: &Config) -> Result<PdRpcClient> {
+        let env = Arc::new(
+            EnvBuilder::new()
+                .cq_count(CQ_COUNT)
+                .name_prefix(thread_name!(CLIENT_PREFIX))
+                .build(),
+        );
+        let security_mgr = Arc::new(
+            if let (Some(ca_path), Some(cert_path), Some(key_path)) =
+                (&config.ca_path, &config.cert_path, &config.key_path)
+            {
+                SecurityManager::load(ca_path, cert_path, key_path)?
+            } else {
+                SecurityManager::default()
+            },
+        );
+
+        let pd = Arc::new(RetryClient::connect(
+            env.clone(),
+            &config.pd_endpoints,
+            security_mgr.clone(),
+            config.timeout,
+        )?);
+        let kv_client_cache = Default::default();
+        let kv_connect = TikvConnect::new(env, security_mgr);
+        Ok(PdRpcClient {
+            pd,
+            kv_client_cache,
+            kv_connect,
+            timeout: config.timeout,
+        })
+    }
+}
+
+impl<KvC: KvConnect + Send + Sync + 'static> PdRpcClient<KvC> {
+    fn kv_client(&self, address: &str) -> Result<KvC::KvClient> {
+        if let Some(client) = self.kv_client_cache.read().unwrap().get(address) {
+            return Ok(client.clone());
+        };
+        info!("connect to tikv endpoint: {:?}", address);
+        self.kv_connect.connect(address).map(|client| {
+            self.kv_client_cache
+                .write()
+                .unwrap()
+                .insert(address.to_owned(), client.clone());
+            client
+        })
+    }
+}
+
+#[cfg(test)]
+pub mod test {
+    use super::*;
+    use crate::raw::{MockDispatch, RawRequest, RawScan};
+    use crate::Error;
+
+    use futures::executor;
+    use futures::future::{ready, BoxFuture};
+    use grpcio::CallOption;
+    use kvproto::kvrpcpb;
+    use kvproto::metapb;
+
+    // FIXME move all the mocks to their own module
+    pub struct MockKvClient;
+
+    impl KvClient for MockKvClient {
+        fn dispatch<T: RawRequest>(
+            &self,
+            _request: &T::RpcRequest,
+            _opt: CallOption,
+        ) -> BoxFuture<'static, Result<T::RpcResponse>> {
+            unreachable!()
+        }
+    }
+
+    impl MockDispatch for RawScan {
+        fn mock_dispatch(
+            &self,
+            request: &kvrpcpb::RawScanRequest,
+            _opt: CallOption,
+        ) -> Option<BoxFuture<'static, Result<kvrpcpb::RawScanResponse>>> {
+            assert!(request.key_only);
+            assert_eq!(request.limit, 10);
+
+            let mut resp = kvrpcpb::RawScanResponse::default();
+            for i in request.start_key[0]..request.end_key[0] {
+                let mut kv = kvrpcpb::KvPair::default();
+                kv.key = vec![i];
+                resp.kvs.push(kv);
+            }
+
+            Some(Box::pin(ready(Ok(resp))))
+        }
+    }
+
+    pub struct MockPdClient;
+
+    impl PdClient for MockPdClient {
+        type KvClient = MockKvClient;
+
+        fn map_region_to_store(
+            self: Arc<Self>,
+            region: Region,
+        ) -> BoxFuture<'static, Result<Store<Self::KvClient>>> {
+            Box::pin(ready(Ok(Store::new(
+                region,
+                MockKvClient,
+                Duration::from_secs(60),
+            ))))
+        }
+
+        fn region_for_key(&self, key: &Key) -> BoxFuture<'static, Result<Region>> {
+            let bytes: &[_] = key.into();
+            let region = if bytes.is_empty() || bytes[0] < 10 {
+                Self::region1()
+            } else {
+                Self::region2()
+            };
+
+            Box::pin(ready(Ok(region)))
+        }
+        fn region_for_id(&self, id: RegionId) -> BoxFuture<'static, Result<Region>> {
+            let result = match id {
+                1 => Ok(Self::region1()),
+                2 => Ok(Self::region2()),
+                _ => Err(Error::region_not_found(id, None)),
+            };
+
+            Box::pin(ready(result))
+        }
+
+        fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
+            unimplemented!()
+        }
+    }
+
+    // fn get_store(self: Arc<Self>, id: StoreId) -> BoxFuture<'static, Result<metapb::Store>> {
+    //     let mut result = metapb::Store::default();
+    //     result.set_address(format!("store-address-{}", id));
+    //     Box::pin(ready(Ok(result)))
+    // }
+
+    impl MockPdClient {
+        fn region1() -> Region {
+            let mut region = Region::default();
+            region.region.id = 1;
+            region.region.set_start_key(vec![0]);
+            region.region.set_end_key(vec![10]);
+
+            let mut leader = metapb::Peer::default();
+            leader.store_id = 41;
+            region.leader = Some(leader);
+
+            region
+        }
+
+        fn region2() -> Region {
+            let mut region = Region::default();
+            region.region.id = 2;
+            region.region.set_start_key(vec![10]);
+            region.region.set_end_key(vec![250, 250]);
+
+            let mut leader = metapb::Peer::default();
+            leader.store_id = 42;
+            region.leader = Some(leader);
+
+            region
+        }
+    }
+
+    // TODO needs us to mock out the KvConnect in PdRpcClient
+    // #[test]
+    // fn test_kv_client() {
+    //     let client = MockPdClient;
+    //     let addr1 = "foo";
+    //     let addr2 = "bar";
+
+    //     let kv1 = client.kv_client(&addr1).unwrap();
+    //     let kv2 = client.kv_client(&addr2).unwrap();
+    //     let kv3 = client.kv_client(&addr2).unwrap();
+    //     assert!(&*kv1 as *const _ != &*kv2 as *const _);
+    //     assert_eq!(&*kv2 as *const _, &*kv3 as *const _);
+    // }
+
+    #[test]
+    fn test_group_keys_by_region() {
+        let client = MockPdClient;
+
+        // FIXME This only works if the keys are in order of regions. Not sure if
+        // that is a reasonable constraint.
+        let tasks: Vec<Key> = vec![
+            vec![1].into(),
+            vec![2].into(),
+            vec![3].into(),
+            vec![5, 2].into(),
+            vec![12].into(),
+            vec![11, 4].into(),
+        ];
+
+        let stream = Arc::new(client).group_keys_by_region(tasks.into_iter());
+        let mut stream = executor::block_on_stream(stream);
+
+        assert_eq!(
+            stream.next().unwrap().unwrap().1,
+            vec![
+                vec![1].into(),
+                vec![2].into(),
+                vec![3].into(),
+                vec![5, 2].into()
+            ]
+        );
+        assert_eq!(
+            stream.next().unwrap().unwrap().1,
+            vec![vec![12].into(), vec![11, 4].into()]
+        );
+        assert!(stream.next().is_none());
+    }
+}

+ 24 - 126
src/rpc/pd/client.rs

@@ -1,26 +1,24 @@
 // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
 
+// FIXME: Remove this when txn is done.
+#![allow(dead_code)]
+
 use std::{
     collections::HashSet,
-    fmt,
     sync::{Arc, RwLock},
     time::{Duration, Instant},
 };
 
 use futures::compat::Compat01As03;
-use futures::future::BoxFuture;
 use futures::prelude::*;
 use grpcio::{CallOption, Environment};
 use kvproto::{metapb, pdpb};
 
 use crate::{
-    rpc::{
-        pd::{
-            context::request_context, request::retry_request, timestamp::TimestampOracle, Region,
-            RegionId, StoreId, Timestamp,
-        },
-        security::SecurityManager,
-    },
+    pd::{timestamp::TimestampOracle, Region, RegionId, StoreId},
+    security::SecurityManager,
+    stats::pd_stats,
+    transaction::Timestamp,
     Error, Result,
 };
 
@@ -34,110 +32,6 @@ macro_rules! pd_request {
     }};
 }
 
-pub trait PdClient: Sized {
-    fn connect(
-        env: Arc<Environment>,
-        endpoints: &[String],
-        security_mgr: Arc<SecurityManager>,
-        timeout: Duration,
-    ) -> Result<Self>;
-
-    fn get_region(self: Arc<Self>, key: &[u8]) -> BoxFuture<'static, Result<Region>>;
-
-    fn get_region_by_id(self: Arc<Self>, id: RegionId) -> BoxFuture<'static, Result<Region>>;
-
-    fn get_store(self: Arc<Self>, id: StoreId) -> BoxFuture<'static, Result<metapb::Store>>;
-
-    fn get_all_stores(self: Arc<Self>) -> BoxFuture<'static, Result<Vec<metapb::Store>>>;
-
-    /// Request a timestamp from the PD cluster.
-    fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>>;
-}
-
-/// Client for communication with a PD cluster. Has the facility to reconnect to the cluster.
-pub struct RetryClient {
-    cluster: RwLock<Cluster>,
-    connection: Connection,
-    timeout: Duration,
-}
-
-impl PdClient for RetryClient {
-    fn connect(
-        env: Arc<Environment>,
-        endpoints: &[String],
-        security_mgr: Arc<SecurityManager>,
-        timeout: Duration,
-    ) -> Result<RetryClient> {
-        let connection = Connection::new(env, security_mgr);
-        let cluster = RwLock::new(connection.connect_cluster(endpoints, timeout)?);
-        Ok(RetryClient {
-            cluster,
-            connection,
-            timeout,
-        })
-    }
-
-    // These get_* functions will try multiple times to make a request, reconnecting as necessary.
-    fn get_region(self: Arc<Self>, key: &[u8]) -> BoxFuture<'static, Result<Region>> {
-        let key = key.to_owned();
-        let timeout = self.timeout;
-        Box::pin(retry_request(self, move |cluster| {
-            cluster.get_region(key.clone(), timeout)
-        }))
-    }
-
-    fn get_region_by_id(self: Arc<Self>, id: RegionId) -> BoxFuture<'static, Result<Region>> {
-        let timeout = self.timeout;
-        Box::pin(retry_request(self, move |cluster| {
-            cluster.get_region_by_id(id, timeout)
-        }))
-    }
-
-    fn get_store(self: Arc<Self>, id: StoreId) -> BoxFuture<'static, Result<metapb::Store>> {
-        let timeout = self.timeout;
-        Box::pin(retry_request(self, move |cluster| {
-            cluster.get_store(id, timeout)
-        }))
-    }
-
-    fn get_all_stores(self: Arc<Self>) -> BoxFuture<'static, Result<Vec<metapb::Store>>> {
-        let timeout = self.timeout;
-        Box::pin(retry_request(self, move |cluster| {
-            cluster.get_all_stores(timeout)
-        }))
-    }
-
-    fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
-        // FIXME: retry or reconnect on error
-        Box::pin(self.cluster.read().unwrap().get_timestamp())
-    }
-}
-
-impl RetryClient {
-    pub fn reconnect(&self, interval: u64) -> Result<()> {
-        if let Some(cluster) =
-            self.connection
-                .reconnect(&self.cluster.read().unwrap(), interval, self.timeout)?
-        {
-            *self.cluster.write().unwrap() = cluster;
-        }
-        Ok(())
-    }
-
-    pub fn with_cluster<T, F: Fn(&Cluster) -> T>(&self, f: F) -> T {
-        f(&self.cluster.read().unwrap())
-    }
-}
-
-impl fmt::Debug for RetryClient {
-    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
-        fmt.debug_struct("pd::RetryClient")
-            .field("cluster_id", &self.cluster.read().unwrap().id)
-            .field("timeout", &self.timeout)
-            .finish()
-    }
-}
-
 /// A PD cluster.
 pub struct Cluster {
     pub id: u64,
@@ -148,8 +42,12 @@ pub struct Cluster {
 
 // These methods make a single attempt to make a request.
 impl Cluster {
-    fn get_region(&self, key: Vec<u8>, timeout: Duration) -> impl Future<Output = Result<Region>> {
-        let context = request_context("get_region");
+    pub fn get_region(
+        &self,
+        key: Vec<u8>,
+        timeout: Duration,
+    ) -> impl Future<Output = Result<Region>> {
+        let context = pd_stats("get_region");
         let option = CallOption::default().timeout(timeout);
 
         let mut req = pd_request!(self.id, pdpb::GetRegionRequest);
@@ -175,12 +73,12 @@ impl Cluster {
             })
     }
 
-    fn get_region_by_id(
+    pub fn get_region_by_id(
         &self,
         id: RegionId,
         timeout: Duration,
     ) -> impl Future<Output = Result<Region>> {
-        let context = request_context("get_region_by_id");
+        let context = pd_stats("get_region_by_id");
         let option = CallOption::default().timeout(timeout);
 
         let mut req = pd_request!(self.id, pdpb::GetRegionByIdRequest);
@@ -204,12 +102,12 @@ impl Cluster {
             })
     }
 
-    fn get_store(
+    pub fn get_store(
         &self,
         id: StoreId,
         timeout: Duration,
     ) -> impl Future<Output = Result<metapb::Store>> {
-        let context = request_context("get_store");
+        let context = pd_stats("get_store");
         let option = CallOption::default().timeout(timeout);
 
         let mut req = pd_request!(self.id, pdpb::GetStoreRequest);
@@ -231,11 +129,11 @@ impl Cluster {
             })
     }
 
-    fn get_all_stores(
+    pub fn get_all_stores(
         &self,
         timeout: Duration,
     ) -> impl Future<Output = Result<Vec<metapb::Store>>> {
-        let context = request_context("get_all_stores");
+        let context = pd_stats("get_all_stores");
         let option = CallOption::default().timeout(timeout);
 
         let req = pd_request!(self.id, pdpb::GetAllStoresRequest);
@@ -256,20 +154,20 @@ impl Cluster {
             })
     }
 
-    fn get_timestamp(&self) -> impl Future<Output = Result<Timestamp>> {
+    pub fn get_timestamp(&self) -> impl Future<Output = Result<Timestamp>> {
         self.tso.clone().get_timestamp()
     }
 }
 
 /// An object for connecting and reconnecting to a PD cluster.
-struct Connection {
+pub struct Connection {
     env: Arc<Environment>,
     security_mgr: Arc<SecurityManager>,
     last_update: RwLock<Instant>,
 }
 
 impl Connection {
-    fn new(env: Arc<Environment>, security_mgr: Arc<SecurityManager>) -> Connection {
+    pub fn new(env: Arc<Environment>, security_mgr: Arc<SecurityManager>) -> Connection {
         Connection {
             env,
             security_mgr,
@@ -277,7 +175,7 @@ impl Connection {
         }
     }
 
-    fn connect_cluster(&self, endpoints: &[String], timeout: Duration) -> Result<Cluster> {
+    pub fn connect_cluster(&self, endpoints: &[String], timeout: Duration) -> Result<Cluster> {
         let members = self.validate_endpoints(endpoints, timeout)?;
         let (client, members) = self.try_connect_leader(&members, timeout)?;
 
@@ -293,7 +191,7 @@ impl Connection {
     }
 
     // Re-establish connection with PD leader in synchronized fashion.
-    fn reconnect(
+    pub fn reconnect(
         &self,
         old_cluster: &Cluster,
         interval: u64,

+ 20 - 20
src/rpc/pd/mod.rs

@@ -1,18 +1,18 @@
 // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
 
-// TODO: Remove this when txn is done.
-#![allow(dead_code)]
-
 use derive_new::new;
 use kvproto::{kvrpcpb, metapb, pdpb};
 
-pub use crate::rpc::pd::client::{PdClient, RetryClient};
 use crate::{Error, Key, Result};
+#[cfg(test)]
+pub use client::test::{MockKvClient, MockPdClient};
+pub use client::{PdClient, PdRpcClient};
+pub use retry::RetryClient;
 
-#[macro_use]
 mod client;
-mod context;
-mod request;
+#[macro_use]
+mod cluster;
+mod retry;
 mod timestamp;
 
 pub type RegionId = u64;
@@ -32,6 +32,7 @@ pub struct Region {
 }
 
 impl Region {
+    #[allow(dead_code)]
     pub fn switch_peer(&mut self, _to: StoreId) -> Result<()> {
         unimplemented!()
     }
@@ -56,14 +57,19 @@ impl Region {
             })
     }
 
-    pub fn start_key(&self) -> &[u8] {
-        self.region.get_start_key()
+    pub fn start_key(&self) -> Key {
+        self.region.get_start_key().to_vec().into()
     }
 
-    pub fn end_key(&self) -> &[u8] {
-        self.region.get_end_key()
+    pub fn end_key(&self) -> Key {
+        self.region.get_end_key().to_vec().into()
     }
 
+    pub fn range(&self) -> (Key, Key) {
+        (self.start_key(), self.end_key())
+    }
+
+    #[allow(dead_code)]
     pub fn ver_id(&self) -> RegionVerId {
         let region = &self.region;
         let epoch = region.get_region_epoch();
@@ -78,21 +84,15 @@ impl Region {
         self.region.get_id()
     }
 
-    pub fn peer(&self) -> Result<metapb::Peer> {
+    pub fn get_store_id(&self) -> Result<StoreId> {
         self.leader
             .as_ref()
-            .map(Clone::clone)
-            .map(Into::into)
+            .cloned()
             .ok_or_else(|| Error::stale_epoch(None))
+            .map(|s| s.get_store_id())
     }
 }
 
-#[derive(Eq, PartialEq, Debug, Clone, Copy)]
-pub struct Timestamp {
-    pub physical: i64,
-    pub logical: i64,
-}
-
 trait PdResponse {
     fn header(&self) -> &pdpb::ResponseHeader;
 }

+ 2 - 2
src/rpc/pd/request.rs

@@ -15,8 +15,8 @@ use std::pin::Pin;
 use tokio_timer::timer::Handle;
 
 use crate::{
-    rpc::pd::client::{Cluster, RetryClient},
-    rpc::util::GLOBAL_TIMER_HANDLE,
+    pd::client::{Cluster, RetryClient},
+    util::GLOBAL_TIMER_HANDLE,
     Result,
 };
 

+ 220 - 0
src/pd/retry.rs

@@ -0,0 +1,220 @@
+// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
+
+//! A utility module for managing and retrying PD requests.
+
+use std::{
+    fmt,
+    sync::{Arc, RwLock},
+    time::{Duration, Instant},
+};
+
+use futures::compat::Compat01As03;
+use futures::future::BoxFuture;
+use futures::prelude::*;
+use futures::ready;
+use futures::task::{Context, Poll};
+use grpcio::Environment;
+use kvproto::metapb;
+use std::pin::Pin;
+use tokio_timer::timer::Handle;
+
+use crate::{
+    pd::{
+        cluster::{Cluster, Connection},
+        Region, RegionId, StoreId,
+    },
+    security::SecurityManager,
+    transaction::Timestamp,
+    util::GLOBAL_TIMER_HANDLE,
+    Result,
+};
+
+const RECONNECT_INTERVAL_SEC: u64 = 1;
+const MAX_REQUEST_COUNT: usize = 3;
+const LEADER_CHANGE_RETRY: usize = 10;
+
+/// Client for communication with a PD cluster. Has the facility to reconnect to the cluster.
+pub struct RetryClient {
+    cluster: RwLock<Cluster>,
+    connection: Connection,
+    timeout: Duration,
+}
+
+impl RetryClient {
+    pub fn connect(
+        env: Arc<Environment>,
+        endpoints: &[String],
+        security_mgr: Arc<SecurityManager>,
+        timeout: Duration,
+    ) -> Result<RetryClient> {
+        let connection = Connection::new(env, security_mgr);
+        let cluster = RwLock::new(connection.connect_cluster(endpoints, timeout)?);
+        Ok(RetryClient {
+            cluster,
+            connection,
+            timeout,
+        })
+    }
+
+    // These get_* functions will try multiple times to make a request, reconnecting as necessary.
+    pub fn get_region(self: Arc<Self>, key: &[u8]) -> BoxFuture<'static, Result<Region>> {
+        let key = key.to_owned();
+        let timeout = self.timeout;
+        Box::pin(retry_request(self, move |cluster| {
+            cluster.get_region(key.clone(), timeout)
+        }))
+    }
+
+    pub fn get_region_by_id(self: Arc<Self>, id: RegionId) -> BoxFuture<'static, Result<Region>> {
+        let timeout = self.timeout;
+        Box::pin(retry_request(self, move |cluster| {
+            cluster.get_region_by_id(id, timeout)
+        }))
+    }
+
+    pub fn get_store(self: Arc<Self>, id: StoreId) -> BoxFuture<'static, Result<metapb::Store>> {
+        let timeout = self.timeout;
+        Box::pin(retry_request(self, move |cluster| {
+            cluster.get_store(id, timeout)
+        }))
+    }
+
+    pub fn reconnect(&self, interval: u64) -> Result<()> {
+        if let Some(cluster) =
+            self.connection
+                .reconnect(&self.cluster.read().unwrap(), interval, self.timeout)?
+        {
+            *self.cluster.write().unwrap() = cluster;
+        }
+        Ok(())
+    }
+
+    pub fn with_cluster<T, F: Fn(&Cluster) -> T>(&self, f: F) -> T {
+        f(&self.cluster.read().unwrap())
+    }
+
+    #[allow(dead_code)]
+    pub fn get_all_stores(self: Arc<Self>) -> BoxFuture<'static, Result<Vec<metapb::Store>>> {
+        let timeout = self.timeout;
+        Box::pin(retry_request(self, move |cluster| {
+            cluster.get_all_stores(timeout)
+        }))
+    }
+
+    pub fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
+        // FIXME: retry or reconnect on error
+        Box::pin(self.cluster.read().unwrap().get_timestamp())
+    }
+}
+
+impl fmt::Debug for RetryClient {
+    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+        fmt.debug_struct("pd::RetryClient")
+            .field("cluster_id", &self.cluster.read().unwrap().id)
+            .field("timeout", &self.timeout)
+            .finish()
+    }
+}
+
+fn retry_request<Resp, Func, RespFuture>(
+    client: Arc<RetryClient>,
+    func: Func,
+) -> RetryRequest<impl Future<Output = Result<Resp>>>
+where
+    Resp: Send + 'static,
+    Func: Fn(&Cluster) -> RespFuture + Send + 'static,
+    RespFuture: Future<Output = Result<Resp>> + Send + 'static,
+{
+    let mut req = Request::new(func, client);
+    RetryRequest {
+        reconnect_count: LEADER_CHANGE_RETRY,
+        future: req
+            .reconnect_if_needed()
+            .map_err(|_| internal_err!("failed to reconnect"))
+            .and_then(move |_| req.send_and_receive()),
+    }
+}
+
+/// A future which will retry a request up to `reconnect_count` times or until it
+/// succeeds.
+struct RetryRequest<Fut> {
+    reconnect_count: usize,
+    future: Fut,
+}
+
+struct Request<Func> {
+    // We keep track of requests sent and after `MAX_REQUEST_COUNT` we reconnect.
+    request_sent: usize,
+
+    client: Arc<RetryClient>,
+    timer: Handle,
+
+    // A function which makes an async request.
+    func: Func,
+}
+
+impl<Resp, Fut> Future for RetryRequest<Fut>
+where
+    Resp: Send + 'static,
+    Fut: Future<Output = Result<Resp>> + Send + 'static,
+{
+    type Output = Result<Resp>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Resp>> {
+        unsafe {
+            let this = Pin::get_unchecked_mut(self);
+            if this.reconnect_count == 0 {
+                return Poll::Ready(Err(internal_err!("failed to send request")));
+            }
+
+            debug!("reconnect remains: {}", this.reconnect_count);
+            this.reconnect_count -= 1;
+            let resp = ready!(Pin::new_unchecked(&mut this.future).poll(cx))?;
+            Poll::Ready(Ok(resp))
+        }
+    }
+}
+
+impl<Resp, Func, RespFuture> Request<Func>
+where
+    Resp: Send + 'static,
+    Func: Fn(&Cluster) -> RespFuture + Send + 'static,
+    RespFuture: Future<Output = Result<Resp>> + Send + 'static,
+{
+    fn new(func: Func, client: Arc<RetryClient>) -> Self {
+        Request {
+            request_sent: 0,
+            client,
+            timer: GLOBAL_TIMER_HANDLE.clone(),
+            func,
+        }
+    }
+
+    fn reconnect_if_needed(&mut self) -> impl Future<Output = std::result::Result<(), ()>> + Send {
+        if self.request_sent < MAX_REQUEST_COUNT {
+            return future::Either::Left(future::ok(()));
+        }
+
+        // FIXME: should not block the core.
+        match self.client.reconnect(RECONNECT_INTERVAL_SEC) {
+            Ok(_) => {
+                self.request_sent = 0;
+                future::Either::Left(future::ok(()))
+            }
+            Err(_) => future::Either::Right(
+                Compat01As03::new(
+                    self.timer
+                        .delay(Instant::now() + Duration::from_secs(RECONNECT_INTERVAL_SEC)),
+                )
+                .map(|_| Err(())),
+            ),
+        }
+    }
+
+    fn send_and_receive(&mut self) -> impl Future<Output = Result<Resp>> + Send {
+        self.request_sent += 1;
+        debug!("request sent: {}", self.request_sent);
+
+        self.client.with_cluster(&self.func)
+    }
+}

+ 1 - 2
src/rpc/pd/timestamp.rs

@@ -11,8 +11,7 @@
 //! single `TsoRequest` to the PD server. The other future receives `TsoResponse`s from the PD
 //! server and allocates timestamps for the requests.
 
-use super::Timestamp;
-use crate::{Error, Result};
+use crate::{transaction::Timestamp, Error, Result};
 
 use futures::{
     channel::{mpsc, oneshot},

+ 2 - 2
src/proptests/raw.rs

@@ -12,7 +12,7 @@ proptest! {
     fn point(
         pair in any::<KvPair>(),
     ) {
-        let client = block_on(Client::connect(Config::new(pd_addrs()))).unwrap();
+        let client = Client::new(Config::new(pd_addrs())).unwrap();
 
         block_on(
             client.put(pair.key().clone(), pair.value().clone())
@@ -36,7 +36,7 @@ proptest! {
     fn batch(
         kvs in arb_batch(any::<KvPair>(), None),
     ) {
-        let client = block_on(Client::connect(Config::new(pd_addrs()))).unwrap();
+        let client = Client::new(Config::new(pd_addrs())).unwrap();
         let keys = kvs.iter().map(|kv| kv.key()).cloned().collect::<Vec<_>>();
 
         block_on(

+ 177 - 212
src/raw/client.rs

@@ -1,128 +1,46 @@
 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
 
-use super::ColumnFamily;
-use crate::{rpc::RpcClient, BoundRange, Config, Error, Key, KvPair, Result, Value};
+use super::{
+    requests::{
+        RawBatchDelete, RawBatchGet, RawBatchPut, RawBatchScan, RawDelete, RawDeleteRange, RawGet,
+        RawPut, RawRequest, RawScan,
+    },
+    ColumnFamily,
+};
+use crate::{pd::PdRpcClient, BoundRange, Config, Error, Key, KvPair, Result, Value};
 
-use derive_new::new;
 use futures::future::Either;
 use futures::prelude::*;
-use futures::task::{Context, Poll};
-use std::{pin::Pin, sync::Arc, u32};
+use std::{sync::Arc, u32};
 
 const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
 
 /// The TiKV raw [`Client`](Client) is used to issue requests to the TiKV server and PD cluster.
 #[derive(Clone)]
 pub struct Client {
-    rpc: Arc<RpcClient>,
+    rpc: Arc<PdRpcClient>,
     cf: Option<ColumnFamily>,
     key_only: bool,
 }
 
-// The macros below make writing `impl Client` concise and hopefully easy to
-// read. Otherwise, the important bits get lost in boilerplate.
-
-// `request` and `scan_request` define public functions which return a future for
-// a request. When using them, supply the name of the function and the names of
-// arguments, the name of the function on the RPC client, and the `Output` type
-// of the returned future.
-macro_rules! request {
-    ($fn_name:ident ($($args:ident),*), $rpc_name:ident, $output:ty) => {
-        pub fn $fn_name(
-            &self,
-            $($args: arg_type!($args),)*
-        ) -> impl Future<Output = Result<$output>> {
-            let this = self.clone();
-            this.rpc.$rpc_name($(arg_convert!($args, $args)),*, this.cf)
-        }
-    }
-}
-
-macro_rules! scan_request {
-    ($fn_name:ident ($($args:ident),*), $rpc_name:ident, $output:ty) => {
-        pub fn $fn_name(
-            &self,
-            $($args: arg_type!($args),)*
-            limit: u32,
-        ) -> impl Future<Output = Result<$output>> {
-            if limit > MAX_RAW_KV_SCAN_LIMIT {
-                Either::Right(future::err(Error::max_scan_limit_exceeded(
-                    limit,
-                    MAX_RAW_KV_SCAN_LIMIT,
-                )))
-            } else {
-                let this = self.clone();
-                Either::Left(this.rpc.$rpc_name($(arg_convert!($args, $args)),*, limit, this.key_only, this.cf))
-            }
-        }
-    }
-}
-
-// The following macros are used by the above macros to understand how arguments
-// should be treated. `self` and `limit` (in scan requests) are treated separately.
-// Skip to the use of `args!` to see the definitions.
-//
-// When using arguments in the `request` macros, we need to use their name, type,
-// and be able to convert them for the RPC client functions. There are two kinds
-// of argument - individual values, and collections of values. In the first case
-// we always use `Into`, and in the second we take an iterator which we also
-// also transform using `Into::into`. This gives users maximum flexibility.
-//
-// `arg_type` defines the type for values (`into`) and collections (`iter`).
-// Likewise, `arg_convert` rule defines how to convert the argument into the more
-// concrete type required by the RPC client. Both macros are created by `args`.
-
-macro_rules! arg_type_rule {
-    (into<$ty:ty>) => (impl Into<$ty>);
-    (iter<$ty:ty>) => (impl IntoIterator<Item = impl Into<$ty>>);
-}
-
-macro_rules! arg_convert_rule {
-    (into $id:ident) => {
-        $id.into()
-    };
-    (iter $id:ident) => {
-        $id.into_iter().map(Into::into).collect()
-    };
-}
-
-// `$i` is the name of the argument (e.g, `key`)
-// `$kind` is either `iter` or `into`.
-// `$ty` is the concrete type of the argument.
-macro_rules! args {
-    ($($i:ident: $kind:ident<$ty:ty>;)*) => {
-        macro_rules! arg_type {
-            $(($i) => (arg_type_rule!($kind<$ty>));)*
-        }
-        macro_rules! arg_convert {
-            $(($i, $id : ident) => (arg_convert_rule!($kind $id));)*
-        }
-    }
-}
-
-args! {
-    key: into<Key>;
-    keys: iter<Key>;
-    value: into<Value>;
-    pairs: iter<KvPair>;
-    range: into<BoundRange>;
-    ranges: iter<BoundRange>;
-}
-
 impl Client {
-    /// Create a new [`Client`](Client) once the [`Connect`](Connect) resolves.
+    /// Create a new [`Client`](Client).
     ///
     /// ```rust,no_run
     /// # #![feature(async_await)]
-    /// # use tikv_client::{Config, raw::Client};
+    /// # use tikv_client::{Config, RawClient};
     /// # use futures::prelude::*;
     /// # futures::executor::block_on(async {
-    /// let connect = Client::connect(Config::default());
-    /// let client = connect.await.unwrap();
+    /// let client = RawClient::new(Config::default()).unwrap();
     /// # });
     /// ```
-    pub fn connect(config: Config) -> Connect {
-        Connect::new(config)
+    pub fn new(config: Config) -> Result<Client> {
+        let rpc = Arc::new(PdRpcClient::connect(&config)?);
+        Ok(Client {
+            rpc,
+            cf: None,
+            key_only: false,
+        })
     }
 
     /// Set the column family of requests.
@@ -132,11 +50,10 @@ impl Client {
     ///
     /// ```rust,no_run
     /// # #![feature(async_await)]
-    /// # use tikv_client::{Config, raw::Client};
+    /// # use tikv_client::{Config, RawClient};
     /// # use futures::prelude::*;
     /// # futures::executor::block_on(async {
-    /// let connect = Client::connect(Config::default());
-    /// let client = connect.await.unwrap();
+    /// let client = RawClient::new(Config::default()).unwrap();
     /// let get_request = client.with_cf("write").get("foo".to_owned());
     /// # });
     /// ```
@@ -157,11 +74,10 @@ impl Client {
     ///
     /// ```rust,no_run
     /// # #![feature(async_await)]
-    /// # use tikv_client::{Config, raw::Client, ToOwnedRange};
+    /// # use tikv_client::{Config, RawClient, ToOwnedRange};
     /// # use futures::prelude::*;
     /// # futures::executor::block_on(async {
-    /// let connect = Client::connect(Config::default());
-    /// let client = connect.await.unwrap();
+    /// let client = RawClient::new(Config::default()).unwrap();
     /// let scan_request = client.with_key_only(true).scan(("TiKV"..="TiDB").to_owned(), 2);
     /// # });
     /// ```
@@ -173,205 +89,254 @@ impl Client {
         }
     }
 
-    /// Create a new get request.
+    /// Create a new 'get' request.
     ///
     /// Once resolved this request will result in the fetching of the value associated with the
     /// given key.
     ///
     /// ```rust,no_run
     /// # #![feature(async_await)]
-    /// # use tikv_client::{Value, Config, raw::Client};
+    /// # use tikv_client::{Value, Config, RawClient};
     /// # use futures::prelude::*;
     /// # futures::executor::block_on(async {
-    /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
-    /// # let connected_client = connecting_client.await.unwrap();
-    /// let key = "TiKV";
-    /// let req = connected_client.get(key);
+    /// # let client = RawClient::new(Config::default()).unwrap();
+    /// let key = "TiKV".to_owned();
+    /// let req = client.get(key);
     /// let result: Option<Value> = req.await.unwrap();
     /// # });
     /// ```
-    request!(get(key), raw_get, Option<Value>);
+    pub fn get(&self, key: impl Into<Key>) -> impl Future<Output = Result<Option<Value>>> {
+        RawGet {
+            key: key.into(),
+            cf: self.cf.clone(),
+        }
+        .execute(self.rpc.clone())
+    }
 
-    /// Create a new batch get request.
+    /// Create a new 'batch get' request.
     ///
     /// Once resolved this request will result in the fetching of the values associated with the
     /// given keys.
     ///
     /// ```rust,no_run
     /// # #![feature(async_await)]
-    /// # use tikv_client::{KvPair, Config, raw::Client};
+    /// # use tikv_client::{KvPair, Config, RawClient};
     /// # use futures::prelude::*;
     /// # futures::executor::block_on(async {
-    /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
-    /// # let connected_client = connecting_client.await.unwrap();
-    /// let keys = vec!["TiKV", "TiDB"];
-    /// let req = connected_client.batch_get(keys);
+    /// # let client = RawClient::new(Config::default()).unwrap();
+    /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
+    /// let req = client.batch_get(keys);
     /// let result: Vec<KvPair> = req.await.unwrap();
     /// # });
     /// ```
-    request!(batch_get(keys), raw_batch_get, Vec<KvPair>);
+    pub fn batch_get(
+        &self,
+        keys: impl IntoIterator<Item = impl Into<Key>>,
+    ) -> impl Future<Output = Result<Vec<KvPair>>> {
+        RawBatchGet {
+            keys: keys.into_iter().map(Into::into).collect(),
+            cf: self.cf.clone(),
+        }
+        .execute(self.rpc.clone())
+    }
 
-    /// Create a new [`Put`](Put) request.
+    /// Create a new 'put' request.
     ///
     /// Once resolved this request will result in the setting of the value associated with the given key.
     ///
     /// ```rust,no_run
     /// # #![feature(async_await)]
-    /// # use tikv_client::{Key, Value, Config, raw::Client};
+    /// # use tikv_client::{Key, Value, Config, RawClient};
     /// # use futures::prelude::*;
     /// # futures::executor::block_on(async {
-    /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
-    /// # let connected_client = connecting_client.await.unwrap();
-    /// let key = "TiKV";
-    /// let val = "TiKV";
-    /// let req = connected_client.put(key, val);
+    /// # let client = RawClient::new(Config::default()).unwrap();
+    /// let key = "TiKV".to_owned();
+    /// let val = "TiKV".to_owned();
+    /// let req = client.put(key, val);
     /// let result: () = req.await.unwrap();
     /// # });
     /// ```
-    request!(put(key, value), raw_put, ());
+    pub fn put(
+        &self,
+        key: impl Into<Key>,
+        value: impl Into<Value>,
+    ) -> impl Future<Output = Result<()>> {
+        let rpc = self.rpc.clone();
+        future::ready(RawPut::new(key, value, &self.cf)).and_then(|put| put.execute(rpc))
+    }
 
-    /// Create a new [`BatchPut`](BatchPut) request.
+    /// Create a new 'batch put' request.
     ///
     /// Once resolved this request will result in the setting of the value associated with the given key.
     ///
     /// ```rust,no_run
     /// # #![feature(async_await)]
-    /// # use tikv_client::{Error, Result, KvPair, Key, Value, Config, raw::Client};
+    /// # use tikv_client::{Error, Result, KvPair, Key, Value, Config, RawClient, ToOwnedRange};
     /// # use futures::prelude::*;
     /// # futures::executor::block_on(async {
-    /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
-    /// # let connected_client = connecting_client.await.unwrap();
-    /// let kvpair1 = ("PD", "Go");
-    /// let kvpair2 = ("TiKV", "Rust");
+    /// # let client = RawClient::new(Config::default()).unwrap();
+    /// let kvpair1 = ("PD".to_owned(), "Go".to_owned());
+    /// let kvpair2 = ("TiKV".to_owned(), "Rust".to_owned());
     /// let iterable = vec![kvpair1, kvpair2];
-    /// let req = connected_client.batch_put(iterable);
+    /// let req = client.batch_put(iterable);
     /// let result: () = req.await.unwrap();
     /// # });
     /// ```
-    request!(batch_put(pairs), raw_batch_put, ());
+    pub fn batch_put(
+        &self,
+        pairs: impl IntoIterator<Item = impl Into<KvPair>>,
+    ) -> impl Future<Output = Result<()>> {
+        let rpc = self.rpc.clone();
+        future::ready(RawBatchPut::new(pairs, &self.cf)).and_then(|put| put.execute(rpc))
+    }
 
-    /// Create a new [`Delete`](Delete) request.
+    /// Create a new 'delete' request.
     ///
     /// Once resolved this request will result in the deletion of the given key.
     ///
     /// ```rust,no_run
     /// # #![feature(async_await)]
-    /// # use tikv_client::{Key, Config, raw::Client};
+    /// # use tikv_client::{Key, Config, RawClient};
     /// # use futures::prelude::*;
     /// # futures::executor::block_on(async {
-    /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
-    /// # let connected_client = connecting_client.await.unwrap();
-    /// let key = "TiKV";
-    /// let req = connected_client.delete(key);
+    /// # let client = RawClient::new(Config::default()).unwrap();
+    /// let key = "TiKV".to_owned();
+    /// let req = client.delete(key);
     /// let result: () = req.await.unwrap();
     /// # });
     /// ```
-    request!(delete(key), raw_delete, ());
+    pub fn delete(&self, key: impl Into<Key>) -> impl Future<Output = Result<()>> {
+        RawDelete {
+            key: key.into(),
+            cf: self.cf.clone(),
+        }
+        .execute(self.rpc.clone())
+    }
 
-    /// Create a new [`BatchDelete`](BatchDelete) request.
+    /// Create a new 'batch delete' request.
     ///
     /// Once resolved this request will result in the deletion of the given keys.
     ///
     /// ```rust,no_run
     /// # #![feature(async_await)]
-    /// # use tikv_client::{Config, raw::Client};
+    /// # use tikv_client::{Config, RawClient};
     /// # use futures::prelude::*;
     /// # futures::executor::block_on(async {
-    /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
-    /// # let connected_client = connecting_client.await.unwrap();
-    /// let keys = vec!["TiKV", "TiDB"];
-    /// let req = connected_client.batch_delete(keys);
+    /// # let client = RawClient::new(Config::default()).unwrap();
+    /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
+    /// let req = client.batch_delete(keys);
     /// let result: () = req.await.unwrap();
     /// # });
     /// ```
-    request!(batch_delete(keys), raw_batch_delete, ());
+    pub fn batch_delete(
+        &self,
+        keys: impl IntoIterator<Item = impl Into<Key>>,
+    ) -> impl Future<Output = Result<()>> {
+        RawBatchDelete {
+            keys: keys.into_iter().map(Into::into).collect(),
+            cf: self.cf.clone(),
+        }
+        .execute(self.rpc.clone())
+    }
 
-    /// Create a new [`Scan`](Scan) request.
+    /// Create a new 'delete range' request.
     ///
-    /// Once resolved this request will result in a scanner over the given keys.
+    /// Once resolved this request will result in the deletion of all keys over the given range.
     ///
     /// ```rust,no_run
     /// # #![feature(async_await)]
-    /// # use tikv_client::{KvPair, Config, raw::Client};
+    /// # use tikv_client::{Key, Config, RawClient, ToOwnedRange};
     /// # use futures::prelude::*;
     /// # futures::executor::block_on(async {
-    /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
-    /// # let connected_client = connecting_client.await.unwrap();
+    /// # let client = RawClient::new(Config::default()).unwrap();
     /// let inclusive_range = "TiKV"..="TiDB";
-    /// let req = connected_client.scan(inclusive_range, 2);
-    /// let result: Vec<KvPair> = req.await.unwrap();
+    /// let req = client.delete_range(inclusive_range.to_owned());
+    /// let result: () = req.await.unwrap();
     /// # });
     /// ```
-    scan_request!(scan(range), raw_scan, Vec<KvPair>);
+    pub fn delete_range(&self, range: impl Into<BoundRange>) -> impl Future<Output = Result<()>> {
+        RawDeleteRange {
+            range: range.into(),
+            cf: self.cf.clone(),
+        }
+        .execute(self.rpc.clone())
+    }
 
-    /// Create a new [`BatchScan`](BatchScan) request.
+    /// Create a new 'scan' request.
     ///
-    /// Once resolved this request will result in a set of scanners over the given keys.
+    /// Once resolved this request will result in a scanner over the given keys.
     ///
     /// ```rust,no_run
     /// # #![feature(async_await)]
-    /// # use tikv_client::{Key, Config, raw::Client};
+    /// # use tikv_client::{KvPair, Config, RawClient, ToOwnedRange};
     /// # use futures::prelude::*;
     /// # futures::executor::block_on(async {
-    /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
-    /// # let connected_client = connecting_client.await.unwrap();
-    /// let inclusive_range1 = "TiDB"..="TiKV";
-    /// let inclusive_range2 = "TiKV"..="TiSpark";
-    /// let iterable = vec![inclusive_range1, inclusive_range2];
-    /// let req = connected_client.batch_scan(iterable, 2);
-    /// let result = req.await;
+    /// # let client = RawClient::new(Config::default()).unwrap();
+    /// let inclusive_range = "TiKV"..="TiDB";
+    /// let req = client.scan(inclusive_range.to_owned(), 2);
+    /// let result: Vec<KvPair> = req.await.unwrap();
     /// # });
     /// ```
-    scan_request!(batch_scan(ranges), raw_batch_scan, Vec<KvPair>);
+    pub fn scan(
+        &self,
+        range: impl Into<BoundRange>,
+        limit: u32,
+    ) -> impl Future<Output = Result<Vec<KvPair>>> {
+        if limit > MAX_RAW_KV_SCAN_LIMIT {
+            Either::Right(future::err(Error::max_scan_limit_exceeded(
+                limit,
+                MAX_RAW_KV_SCAN_LIMIT,
+            )))
+        } else {
+            Either::Left(
+                RawScan {
+                    range: range.into(),
+                    limit,
+                    key_only: self.key_only,
+                    cf: self.cf.clone(),
+                }
+                .execute(self.rpc.clone()),
+            )
+        }
+    }
 
-    /// Create a new [`DeleteRange`](DeleteRange) request.
+    /// Create a new 'batch scan' request.
     ///
-    /// Once resolved this request will result in the deletion of all keys over the given range.
+    /// Once resolved this request will result in a set of scanners over the given keys.
     ///
     /// ```rust,no_run
     /// # #![feature(async_await)]
-    /// # use tikv_client::{Key, Config, raw::Client};
+    /// # use tikv_client::{Key, Config, RawClient, ToOwnedRange};
     /// # use futures::prelude::*;
     /// # futures::executor::block_on(async {
-    /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
-    /// # let connected_client = connecting_client.await.unwrap();
-    /// let inclusive_range = "TiKV"..="TiDB";
-    /// let req = connected_client.delete_range(inclusive_range);
-    /// let result: () = req.await.unwrap();
+    /// # let client = RawClient::new(Config::default()).unwrap();
+    /// let inclusive_range1 = "TiDB"..="TiKV";
+    /// let inclusive_range2 = "TiKV"..="TiSpark";
+    /// let iterable = vec![inclusive_range1.to_owned(), inclusive_range2.to_owned()];
+    /// let req = client.batch_scan(iterable, 2);
+    /// let result = req.await;
     /// # });
     /// ```
-    request!(delete_range(range), raw_delete_range, ());
-}
-
-/// An unresolved [`Client`](Client) connection to a TiKV cluster.
-///
-/// Once resolved it will result in a connected [`Client`](Client).
-///
-/// ```rust,no_run
-/// # #![feature(async_await)]
-/// use tikv_client::{Config, raw::{Client, Connect}};
-/// use futures::prelude::*;
-///
-/// # futures::executor::block_on(async {
-/// let connect: Connect = Client::connect(Config::default());
-/// let client: Client = connect.await.unwrap();
-/// # });
-/// ```
-#[derive(new)]
-pub struct Connect {
-    config: Config,
-}
-
-impl Future for Connect {
-    type Output = Result<Client>;
-
-    fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
-        let config = &self.config;
-        let rpc = Arc::new(RpcClient::connect(config)?);
-        Poll::Ready(Ok(Client {
-            rpc,
-            cf: None,
-            key_only: false,
-        }))
+    pub fn batch_scan(
+        &self,
+        ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
+        each_limit: u32,
+    ) -> impl Future<Output = Result<Vec<KvPair>>> {
+        if each_limit > MAX_RAW_KV_SCAN_LIMIT {
+            Either::Right(future::err(Error::max_scan_limit_exceeded(
+                each_limit,
+                MAX_RAW_KV_SCAN_LIMIT,
+            )))
+        } else {
+            Either::Left(
+                RawBatchScan {
+                    ranges: ranges.into_iter().map(Into::into).collect(),
+                    each_limit,
+                    key_only: self.key_only,
+                    cf: self.cf.clone(),
+                }
+                .execute(self.rpc.clone()),
+            )
+        }
     }
 }

+ 6 - 2
src/raw/mod.rs

@@ -10,11 +10,15 @@
 //! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace.
 //!
 
-pub use self::client::{Client, Connect};
+pub use self::client::Client;
+pub(crate) use requests::RawRequest;
+#[cfg(test)]
+pub use requests::*;
 
 use std::fmt;
 
 mod client;
+mod requests;
 
 /// A [`ColumnFamily`](ColumnFamily) is an optional parameter for [`raw::Client`](Client) requests.
 ///
@@ -31,7 +35,7 @@ mod client;
 /// The best (and only) way to create a [`ColumnFamily`](ColumnFamily) is via the `From` implementation:
 ///
 /// ```rust
-/// # use tikv_client::raw::ColumnFamily;
+/// # use tikv_client::ColumnFamily;
 ///
 /// let cf = ColumnFamily::from("write");
 /// let cf = ColumnFamily::from(String::from("write"));

+ 627 - 0
src/raw/requests.rs

@@ -0,0 +1,627 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use crate::{
+    kv_client::{HasError, KvClient, KvRawRequest, RpcFnType, Store},
+    pd::PdClient,
+    raw::ColumnFamily,
+    BoundRange, Error, Key, KvPair, Result, Value,
+};
+
+use futures::future::BoxFuture;
+use futures::prelude::*;
+use futures::stream::BoxStream;
+use grpcio::CallOption;
+use kvproto::kvrpcpb;
+use kvproto::tikvpb::TikvClient;
+use std::mem;
+use std::sync::Arc;
+
+pub trait RawRequest: Sync + Send + 'static + Sized + Clone {
+    type Result;
+    type RpcRequest;
+    type RpcResponse: HasError + Clone + Send + 'static;
+    type KeyType;
+    const REQUEST_NAME: &'static str;
+    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse>;
+
+    fn execute(
+        mut self,
+        pd_client: Arc<impl PdClient>,
+    ) -> BoxFuture<'static, Result<Self::Result>> {
+        let stores = self.store_stream(pd_client);
+        Self::reduce(
+            stores
+                .and_then(move |(key, store)| {
+                    let request = self.clone().into_request(key, &store);
+                    self.mock_dispatch(&request, store.call_options())
+                        .unwrap_or_else(|| store.dispatch::<Self>(&request, store.call_options()))
+                })
+                .map_ok(move |r| Self::map_result(r))
+                .boxed(),
+        )
+    }
+
+    fn store_stream<PdC: PdClient>(
+        &mut self,
+        pd_client: Arc<PdC>,
+    ) -> BoxStream<'static, Result<(Self::KeyType, Store<PdC::KvClient>)>>;
+
+    fn into_request<KvC: KvClient>(
+        self,
+        key: Self::KeyType,
+        store: &Store<KvC>,
+    ) -> Self::RpcRequest;
+
+    fn map_result(result: Self::RpcResponse) -> Self::Result;
+
+    fn reduce(
+        results: BoxStream<'static, Result<Self::Result>>,
+    ) -> BoxFuture<'static, Result<Self::Result>>;
+}
+
+/// Permits easy mocking of rpc calls.
+pub trait MockDispatch: RawRequest {
+    fn mock_dispatch(
+        &self,
+        _request: &Self::RpcRequest,
+        _opt: CallOption,
+    ) -> Option<BoxFuture<'static, Result<Self::RpcResponse>>> {
+        None
+    }
+}
+
+impl<T: RawRequest> MockDispatch for T {}
+
+#[derive(Clone)]
+pub struct RawGet {
+    pub key: Key,
+    pub cf: Option<ColumnFamily>,
+}
+
+impl RawRequest for RawGet {
+    type Result = Option<Value>;
+    type RpcRequest = kvrpcpb::RawGetRequest;
+    type RpcResponse = kvrpcpb::RawGetResponse;
+    type KeyType = Key;
+    const REQUEST_NAME: &'static str = "raw_get";
+    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> = TikvClient::raw_get_async_opt;
+
+    fn into_request<KvC: KvClient>(
+        self,
+        key: Self::KeyType,
+        store: &Store<KvC>,
+    ) -> Self::RpcRequest {
+        let mut req = store.request::<Self::RpcRequest>();
+        req.set_key(key.into());
+        req.maybe_set_cf(self.cf);
+
+        req
+    }
+
+    fn store_stream<PdC: PdClient>(
+        &mut self,
+        pd_client: Arc<PdC>,
+    ) -> BoxStream<'static, Result<(Self::KeyType, Store<PdC::KvClient>)>> {
+        let key = self.key.clone();
+        pd_client
+            .store_for_key(&self.key)
+            .map_ok(move |store| (key, store))
+            .into_stream()
+            .boxed()
+    }
+
+    fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
+        let result: Value = resp.take_value().into();
+        if result.is_empty() {
+            None
+        } else {
+            Some(result)
+        }
+    }
+
+    fn reduce(
+        results: BoxStream<'static, Result<Self::Result>>,
+    ) -> BoxFuture<'static, Result<Self::Result>> {
+        results
+            .into_future()
+            .map(|(f, _)| f.expect("no results should be impossible"))
+            .boxed()
+    }
+}
+
+#[derive(Clone)]
+pub struct RawBatchGet {
+    pub keys: Vec<Key>,
+    pub cf: Option<ColumnFamily>,
+}
+
+impl RawRequest for RawBatchGet {
+    type Result = Vec<KvPair>;
+    type RpcRequest = kvrpcpb::RawBatchGetRequest;
+    type RpcResponse = kvrpcpb::RawBatchGetResponse;
+    type KeyType = Vec<Key>;
+    const REQUEST_NAME: &'static str = "raw_batch_get";
+    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> =
+        TikvClient::raw_batch_get_async_opt;
+
+    fn into_request<KvC: KvClient>(
+        self,
+        keys: Self::KeyType,
+        store: &Store<KvC>,
+    ) -> Self::RpcRequest {
+        let mut req = store.request::<Self::RpcRequest>();
+        req.set_keys(keys.into_iter().map(Into::into).collect());
+        req.maybe_set_cf(self.cf);
+
+        req
+    }
+
+    fn store_stream<PdC: PdClient>(
+        &mut self,
+        pd_client: Arc<PdC>,
+    ) -> BoxStream<'static, Result<(Self::KeyType, Store<PdC::KvClient>)>> {
+        let mut keys = Vec::new();
+        mem::swap(&mut keys, &mut self.keys);
+
+        pd_client
+            .clone()
+            .group_keys_by_region(keys.into_iter())
+            .and_then(move |(region_id, key)| {
+                pd_client
+                    .clone()
+                    .store_for_id(region_id)
+                    .map_ok(move |store| (key, store))
+            })
+            .boxed()
+    }
+
+    fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
+        resp.take_pairs().into_iter().map(Into::into).collect()
+    }
+
+    fn reduce(
+        results: BoxStream<'static, Result<Self::Result>>,
+    ) -> BoxFuture<'static, Result<Self::Result>> {
+        results.try_concat().boxed()
+    }
+}
+
+#[derive(Clone)]
+pub struct RawPut {
+    pub key: Key,
+    pub value: Value,
+    pub cf: Option<ColumnFamily>,
+}
+
+impl RawPut {
+    pub fn new(
+        key: impl Into<Key>,
+        value: impl Into<Value>,
+        cf: &Option<ColumnFamily>,
+    ) -> Result<RawPut> {
+        let value = value.into();
+        if value.is_empty() {
+            return Err(Error::empty_value());
+        }
+
+        let key = key.into();
+        Ok(RawPut {
+            key,
+            value,
+            cf: cf.clone(),
+        })
+    }
+}
+
+impl RawRequest for RawPut {
+    type Result = ();
+    type RpcRequest = kvrpcpb::RawPutRequest;
+    type RpcResponse = kvrpcpb::RawPutResponse;
+    type KeyType = KvPair;
+    const REQUEST_NAME: &'static str = "raw_put";
+    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> = TikvClient::raw_put_async_opt;
+
+    fn into_request<KvC: KvClient>(
+        self,
+        key: Self::KeyType,
+        store: &Store<KvC>,
+    ) -> Self::RpcRequest {
+        let mut req = store.request::<Self::RpcRequest>();
+        req.set_key(key.0.into());
+        req.set_value(key.1.into());
+        req.maybe_set_cf(self.cf);
+
+        req
+    }
+
+    fn store_stream<PdC: PdClient>(
+        &mut self,
+        pd_client: Arc<PdC>,
+    ) -> BoxStream<'static, Result<(Self::KeyType, Store<PdC::KvClient>)>> {
+        let kv = (self.key.clone(), self.value.clone()).into();
+        pd_client
+            .store_for_key(&self.key)
+            .map_ok(move |store| (kv, store))
+            .into_stream()
+            .boxed()
+    }
+
+    fn map_result(_: Self::RpcResponse) -> Self::Result {}
+
+    fn reduce(
+        results: BoxStream<'static, Result<Self::Result>>,
+    ) -> BoxFuture<'static, Result<Self::Result>> {
+        results
+            .into_future()
+            .map(|(f, _)| f.expect("no results should be impossible"))
+            .boxed()
+    }
+}
+
+#[derive(Clone)]
+pub struct RawBatchPut {
+    pub pairs: Vec<KvPair>,
+    pub cf: Option<ColumnFamily>,
+}
+
+impl RawBatchPut {
+    pub fn new(
+        pairs: impl IntoIterator<Item = impl Into<KvPair>>,
+        cf: &Option<ColumnFamily>,
+    ) -> Result<RawBatchPut> {
+        let pairs: Vec<KvPair> = pairs.into_iter().map(Into::into).collect();
+        if pairs.iter().any(|pair| pair.value().is_empty()) {
+            return Err(Error::empty_value());
+        }
+
+        Ok(RawBatchPut {
+            pairs,
+            cf: cf.clone(),
+        })
+    }
+}
+
+impl RawRequest for RawBatchPut {
+    type Result = ();
+    type RpcRequest = kvrpcpb::RawBatchPutRequest;
+    type RpcResponse = kvrpcpb::RawBatchPutResponse;
+    type KeyType = Vec<KvPair>;
+    const REQUEST_NAME: &'static str = "raw_batch_put";
+    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> =
+        TikvClient::raw_batch_put_async_opt;
+
+    fn into_request<KvC: KvClient>(
+        self,
+        pairs: Self::KeyType,
+        store: &Store<KvC>,
+    ) -> Self::RpcRequest {
+        let mut req = store.request::<Self::RpcRequest>();
+        req.set_pairs(pairs.into_iter().map(Into::into).collect());
+        req.maybe_set_cf(self.cf);
+
+        req
+    }
+
+    fn store_stream<PdC: PdClient>(
+        &mut self,
+        pd_client: Arc<PdC>,
+    ) -> BoxStream<'static, Result<(Self::KeyType, Store<PdC::KvClient>)>> {
+        let mut pairs = Vec::new();
+        mem::swap(&mut pairs, &mut self.pairs);
+
+        pd_client
+            .clone()
+            .group_keys_by_region(pairs.into_iter())
+            .and_then(move |(region_id, pair)| {
+                pd_client
+                    .clone()
+                    .store_for_id(region_id)
+                    .map_ok(move |store| (pair, store))
+            })
+            .boxed()
+    }
+
+    fn map_result(_: Self::RpcResponse) -> Self::Result {}
+
+    fn reduce(
+        results: BoxStream<'static, Result<Self::Result>>,
+    ) -> BoxFuture<'static, Result<Self::Result>> {
+        results.try_collect().boxed()
+    }
+}
+
+#[derive(Clone)]
+pub struct RawDelete {
+    pub key: Key,
+    pub cf: Option<ColumnFamily>,
+}
+
+impl RawRequest for RawDelete {
+    type Result = ();
+    type RpcRequest = kvrpcpb::RawDeleteRequest;
+    type RpcResponse = kvrpcpb::RawDeleteResponse;
+    type KeyType = Key;
+    const REQUEST_NAME: &'static str = "raw_delete";
+    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> = TikvClient::raw_delete_async_opt;
+
+    fn into_request<KvC: KvClient>(
+        self,
+        key: Self::KeyType,
+        store: &Store<KvC>,
+    ) -> Self::RpcRequest {
+        let mut req = store.request::<Self::RpcRequest>();
+        req.set_key(key.into());
+        req.maybe_set_cf(self.cf);
+
+        req
+    }
+
+    fn store_stream<PdC: PdClient>(
+        &mut self,
+        pd_client: Arc<PdC>,
+    ) -> BoxStream<'static, Result<(Self::KeyType, Store<PdC::KvClient>)>> {
+        let key = self.key.clone();
+        pd_client
+            .store_for_key(&self.key)
+            .map_ok(move |store| (key, store))
+            .into_stream()
+            .boxed()
+    }
+
+    fn map_result(_: Self::RpcResponse) -> Self::Result {}
+
+    fn reduce(
+        results: BoxStream<'static, Result<Self::Result>>,
+    ) -> BoxFuture<'static, Result<Self::Result>> {
+        results
+            .into_future()
+            .map(|(f, _)| f.expect("no results should be impossible"))
+            .boxed()
+    }
+}
+
+#[derive(Clone)]
+pub struct RawBatchDelete {
+    pub keys: Vec<Key>,
+    pub cf: Option<ColumnFamily>,
+}
+
+impl RawRequest for RawBatchDelete {
+    type Result = ();
+    type RpcRequest = kvrpcpb::RawBatchDeleteRequest;
+    type RpcResponse = kvrpcpb::RawBatchDeleteResponse;
+    type KeyType = Vec<Key>;
+    const REQUEST_NAME: &'static str = "raw_batch_delete";
+    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> =
+        TikvClient::raw_batch_delete_async_opt;
+
+    fn into_request<KvC: KvClient>(
+        self,
+        keys: Self::KeyType,
+        store: &Store<KvC>,
+    ) -> Self::RpcRequest {
+        let mut req = store.request::<Self::RpcRequest>();
+        req.set_keys(keys.into_iter().map(Into::into).collect());
+        req.maybe_set_cf(self.cf);
+
+        req
+    }
+
+    fn store_stream<PdC: PdClient>(
+        &mut self,
+        pd_client: Arc<PdC>,
+    ) -> BoxStream<'static, Result<(Self::KeyType, Store<PdC::KvClient>)>> {
+        let mut keys = Vec::new();
+        mem::swap(&mut keys, &mut self.keys);
+
+        pd_client
+            .clone()
+            .group_keys_by_region(keys.into_iter())
+            .and_then(move |(region_id, key)| {
+                pd_client
+                    .clone()
+                    .store_for_id(region_id)
+                    .map_ok(move |store| (key, store))
+            })
+            .boxed()
+    }
+
+    fn map_result(_: Self::RpcResponse) -> Self::Result {}
+
+    fn reduce(
+        results: BoxStream<'static, Result<Self::Result>>,
+    ) -> BoxFuture<'static, Result<Self::Result>> {
+        results.try_collect().boxed()
+    }
+}
+
+#[derive(Clone)]
+pub struct RawDeleteRange {
+    pub range: BoundRange,
+    pub cf: Option<ColumnFamily>,
+}
+
+impl RawRequest for RawDeleteRange {
+    type Result = ();
+    type RpcRequest = kvrpcpb::RawDeleteRangeRequest;
+    type RpcResponse = kvrpcpb::RawDeleteRangeResponse;
+    type KeyType = (Key, Key);
+    const REQUEST_NAME: &'static str = "raw_delete_range";
+    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> =
+        TikvClient::raw_delete_range_async_opt;
+
+    fn into_request<KvC: KvClient>(
+        self,
+        (start_key, end_key): Self::KeyType,
+        store: &Store<KvC>,
+    ) -> Self::RpcRequest {
+        let mut req = store.request::<Self::RpcRequest>();
+        req.set_start_key(start_key.into());
+        req.set_end_key(end_key.into());
+        req.maybe_set_cf(self.cf);
+
+        req
+    }
+
+    fn store_stream<PdC: PdClient>(
+        &mut self,
+        pd_client: Arc<PdC>,
+    ) -> BoxStream<'static, Result<(Self::KeyType, Store<PdC::KvClient>)>> {
+        let range = self.range.clone();
+        pd_client
+            .stores_for_range(range)
+            .map_ok(move |store| {
+                // TODO should be bounded by self.range
+                let range = store.region.range();
+                (range, store)
+            })
+            .into_stream()
+            .boxed()
+    }
+
+    fn map_result(_: Self::RpcResponse) -> Self::Result {}
+
+    fn reduce(
+        results: BoxStream<'static, Result<Self::Result>>,
+    ) -> BoxFuture<'static, Result<Self::Result>> {
+        results
+            .into_future()
+            .map(|(f, _)| f.expect("no results should be impossible"))
+            .boxed()
+    }
+}
+
+#[derive(Clone)]
+pub struct RawScan {
+    pub range: BoundRange,
+    // TODO this limit is currently treated as a per-region limit, not a total
+    // limit.
+    pub limit: u32,
+    pub key_only: bool,